summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-22 11:55:35 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-22 11:55:35 +0000
commit3a29d0d519589d8aba1011d85ae36e93e75a2989 (patch)
tree0f26cd77ee141e4002ffaa239dc38c540e743c1a /src/rabbit_channel.erl
parent860e1265688f1d56150b6a980e3460829ae4751e (diff)
downloadrabbitmq-server-3a29d0d519589d8aba1011d85ae36e93e75a2989.tar.gz
Substantially improve both performance and correctness: 1) really don't treat presence of MsgSeqNo as a cue we are doing confirms, and 2) get the right dtree function when we receive a MsgSeqNo.
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl52
1 files changed, 29 insertions, 23 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b862766a..346dd2e1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -633,7 +633,7 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_mandatory(MsgSeqNo, State = #ch{mandatory = Mand}) ->
- {_MMsgs, Mand1} = dtree:take_all([MsgSeqNo], Mand),
+ Mand1 = dtree:drop(MsgSeqNo, Mand),
State#ch{mandatory = Mand1}.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -708,16 +708,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(Props, State),
check_expiration_header(Props),
+ DoConfirm = Tx =/= none orelse ConfirmEnabled,
{MsgSeqNo, State1} =
- case {Tx, ConfirmEnabled orelse Mandatory} of
- {none, false} -> {undefined, State};
- {_, _} -> SeqNo = State#ch.publish_seqno,
- {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ case DoConfirm orelse Mandatory of
+ false -> {undefined, State};
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_in(Message, TraceState),
- Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(
+ Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
DQ = {Delivery, QNames},
{noreply, case Tx of
@@ -1511,7 +1513,6 @@ notify_limiter(Limiter, Acked) ->
end.
deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
- msg_seq_no = undefined,
mandatory = false},
[]}, State) -> %% optimisation
?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
@@ -1519,6 +1520,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
mandatory = Mandatory,
+ confirmed = Confirmed,
msg_seq_no = MsgSeqNo},
DelQNames}, State = #ch{queue_names = QNames,
queue_monitors = QMons}) ->
@@ -1542,10 +1544,12 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
false -> dict:store(QPid, QName, QNames0)
end, pmon:monitor(QPid, QMons0)}
end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
- State1 = process_routing_result(DeliveredQPids, XName, Mandatory, MsgSeqNo,
- Message,
- State#ch{queue_names = QNames1,
- queue_monitors = QMons1}),
+ State1 = process_routing_confirm(
+ DeliveredQPids, Confirmed, MsgSeqNo, XName,
+ process_routing_mandatory(
+ DeliveredQPids, Mandatory, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1})),
?INCR_STATS([{exchange_stats, XName, 1} |
[{queue_exchange_stats, {QName, XName}, 1} ||
QPid <- DeliveredQPids,
@@ -1555,22 +1559,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
%% TODO unbreak basic.return stats
-process_routing_result(_, _, _, undefined, _Msg, State) ->
+process_routing_mandatory(_, false, _MsgSeqNo, _Msg, State) ->
State;
-process_routing_result([], XName, false, MsgSeqNo, _Msg, State) ->
- record_confirms([{MsgSeqNo, XName}], State);
-process_routing_result([], XName, true, MsgSeqNo, Msg, State) ->
+process_routing_mandatory([], true, _MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
+ State;
+process_routing_mandatory(QPids, true, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(
+ MsgSeqNo, QPids, Msg, State#ch.mandatory)}.
+
+process_routing_confirm(_, false, _MsgSeqNo, _XName, State) ->
+ State;
+process_routing_confirm([], true, MsgSeqNo, XName, State) ->
+ exit(bang),
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_result(QPids, XName, Mandatory, MsgSeqNo, Msg, State) ->
- MandatoryTree = case Mandatory of
- false -> State#ch.mandatory;
- true -> dtree:insert(MsgSeqNo, QPids, Msg,
- State#ch.mandatory)
- end,
+process_routing_confirm(QPids, true, MsgSeqNo, XName, State) ->
+ exit(bang),
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed),
- mandatory = MandatoryTree}.
+ State#ch.unconfirmed)}.
%% process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
%% ok = basic_return(Msg, State, no_route),