summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-22 22:29:04 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-22 22:29:04 +0000
commit8b5f304474d529bce7ad7e0be8e00392baffbf8c (patch)
treed2dae11bf47d9dcca629ca0a22bb10a857264737
parentc409185407227ffd5830c03e922b61b1891219a6 (diff)
downloadrabbitmq-server-8b5f304474d529bce7ad7e0be8e00392baffbf8c.tar.gz
some inlining, cosmetics, and more explicit sequencing
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl42
2 files changed, 21 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c65347e4..a698a435 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -505,7 +505,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- send_mandatory(Delivery),
+ send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9bcded68..1da4b0fd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -348,9 +348,9 @@ handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
-handle_cast({mandatory_received, MsgSeqNo}, State) ->
+handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
- noreply_coalesce(handle_mandatory(MsgSeqNo, State));
+ noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
handle_cast({confirm, MsgSeqNos, From}, State) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
@@ -631,10 +631,6 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
record_confirms(MXs, State#ch{unconfirmed = UC1}).
-handle_mandatory(MsgSeqNo, State = #ch{mandatory = Mand}) ->
- Mand1 = dtree:drop(MsgSeqNo, Mand),
- State#ch{mandatory = Mand1}.
-
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
%% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
State1 = State#ch{state = running},
@@ -1545,33 +1541,35 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
false -> dict:store(QPid, QName, QNames0)
end, pmon:monitor(QPid, QMons0)}
end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
- State1 = process_routing_confirm(
- DeliveredQPids, Confirm, MsgSeqNo, XName,
- process_routing_mandatory(
- DeliveredQPids, Mandatory, MsgSeqNo, Message,
- State#ch{queue_names = QNames1,
- queue_monitors = QMons1})),
+ State1 = State#ch{queue_names = QNames1,
+ queue_monitors = QMons1},
+ %% NB: the order here is important since basic.returns must be
+ %% sent before confirms.
+ State2 = process_routing_mandatory(Mandatory, DeliveredQPids, MsgSeqNo,
+ Message, State1),
+ State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo,
+ XName, State2),
?INCR_STATS([{exchange_stats, XName, 1} |
[{queue_exchange_stats, {QName, XName}, 1} ||
QPid <- DeliveredQPids,
{ok, QName} <- [dict:find(QPid, QNames1)]]],
- publish, State1),
- State1.
+ publish, State3),
+ State3.
-process_routing_mandatory(_, false, _MsgSeqNo, _Msg, State) ->
+process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
State;
-process_routing_mandatory([], true, _MsgSeqNo, Msg, State) ->
+process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
State;
-process_routing_mandatory(QPids, true, MsgSeqNo, Msg, State) ->
- State#ch{mandatory = dtree:insert(
- MsgSeqNo, QPids, Msg, State#ch.mandatory)}.
+process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg,
+ State#ch.mandatory)}.
-process_routing_confirm(_, false, _MsgSeqNo, _XName, State) ->
+process_routing_confirm(false, _, _MsgSeqNo, _XName, State) ->
State;
-process_routing_confirm([], true, MsgSeqNo, XName, State) ->
+process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_confirm(QPids, true, MsgSeqNo, XName, State) ->
+process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
State#ch.unconfirmed)}.