diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-22 11:55:35 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-22 11:55:35 +0000 |
commit | 3a29d0d519589d8aba1011d85ae36e93e75a2989 (patch) | |
tree | 0f26cd77ee141e4002ffaa239dc38c540e743c1a | |
parent | 860e1265688f1d56150b6a980e3460829ae4751e (diff) | |
download | rabbitmq-server-3a29d0d519589d8aba1011d85ae36e93e75a2989.tar.gz |
Substantially improve both performance and correctness: 1) really don't treat presence of MsgSeqNo as a cue we are doing confirms, and 2) get the right dtree function when we receive a MsgSeqNo.
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/dtree.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 14 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 52 |
5 files changed, 49 insertions, 35 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 8b42cdea..ba52a407 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -70,7 +70,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, sender, message, msg_seq_no}). +-record(delivery, {mandatory, confirmed, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). diff --git a/src/dtree.erl b/src/dtree.erl index 5ff36bd9..f39d8e3a 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -32,7 +32,7 @@ -module(dtree). --export([empty/0, insert/4, take/3, take/2, take_all/2, +-export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2, is_defined/2, is_empty/1, smallest/1, size/1]). %%---------------------------------------------------------------------------- @@ -53,6 +53,7 @@ -spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(drop/2 :: (pk(), ?MODULE()) -> ?MODULE()). -spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -spec(smallest/1 :: (?MODULE()) -> kv()). @@ -120,6 +121,11 @@ take_all(SK, {P, S}) -> {KVs, {P1, prune(SKS, PKS, S)}} end. +%% Drop all entries for the given primary key. +drop(PK, {P, S}) -> + SKS = gb_trees:get(PK, P), + {gb_trees:delete(PK, P), prune(SKS, gb_sets:singleton(PK), S)}. + is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). is_empty({P, _S}) -> gb_trees:is_empty(P). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index df9748fb..23fbf93f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -425,9 +425,10 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. -send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) -> +send_or_record_confirm(#delivery{confirmed = false}, State) -> {never, State}; -send_or_record_confirm(#delivery{sender = SenderPid, +send_or_record_confirm(#delivery{confirmed = true, + sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, @@ -436,7 +437,8 @@ send_or_record_confirm(#delivery{sender = SenderPid, msg_id_to_channel = MTC}) -> MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), {eventually, State#q{msg_id_to_channel = MTC1}}; -send_or_record_confirm(#delivery{sender = SenderPid, +send_or_record_confirm(#delivery{confirmed = true, + sender = SenderPid, msg_seq_no = MsgSeqNo}, State) -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 3e944867..454755e6 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,7 +20,7 @@ -export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, prepend_table_header/3, - extract_headers/1, map_headers/2, delivery/3, header_routes/1, + extract_headers/1, map_headers/2, delivery/4, header_routes/1, parse_expiration/1]). -export([build_content/2, from_content/1, msg_size/1]). @@ -46,8 +46,8 @@ properties_input(), body_input()) -> publish_result()). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/3 :: - (boolean(), rabbit_types:message(), undefined | integer()) -> +-spec(delivery/4 :: + (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), @@ -93,10 +93,10 @@ publish(Exchange, RoutingKeyBin, Properties, Body) -> %% erlang distributed network. publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> Message = message(XName, RKey, properties(Props), Body), - publish(X, delivery(Mandatory, Message, undefined)); + publish(X, delivery(Mandatory, false, Message, undefined)); publish(XName, RKey, Mandatory, Props, Body) -> Message = message(XName, RKey, properties(Props), Body), - publish(delivery(Mandatory, Message, undefined)). + publish(delivery(Mandatory, false, Message, undefined)). publish(Delivery = #delivery{ message = #basic_message{exchange_name = XName}}) -> @@ -110,8 +110,8 @@ publish(X, Delivery) -> DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), {ok, DeliveredQPids}. -delivery(Mandatory, Message, MsgSeqNo) -> - #delivery{mandatory = Mandatory, sender = self(), +delivery(Mandatory, Confirmed, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, confirmed = Confirmed, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b862766a..346dd2e1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -633,7 +633,7 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_mandatory(MsgSeqNo, State = #ch{mandatory = Mand}) -> - {_MMsgs, Mand1} = dtree:take_all([MsgSeqNo], Mand), + Mand1 = dtree:drop(MsgSeqNo, Mand), State#ch{mandatory = Mand1}. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -708,16 +708,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(Props, State), check_expiration_header(Props), + DoConfirm = Tx =/= none orelse ConfirmEnabled, {MsgSeqNo, State1} = - case {Tx, ConfirmEnabled orelse Mandatory} of - {none, false} -> {undefined, State}; - {_, _} -> SeqNo = State#ch.publish_seqno, - {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + case DoConfirm orelse Mandatory of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_in(Message, TraceState), - Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), + Delivery = rabbit_basic:delivery( + Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), DQ = {Delivery, QNames}, {noreply, case Tx of @@ -1511,7 +1513,6 @@ notify_limiter(Limiter, Acked) -> end. deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, - msg_seq_no = undefined, mandatory = false}, []}, State) -> %% optimisation ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), @@ -1519,6 +1520,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, mandatory = Mandatory, + confirmed = Confirmed, msg_seq_no = MsgSeqNo}, DelQNames}, State = #ch{queue_names = QNames, queue_monitors = QMons}) -> @@ -1542,10 +1544,12 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ false -> dict:store(QPid, QName, QNames0) end, pmon:monitor(QPid, QMons0)} end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), - State1 = process_routing_result(DeliveredQPids, XName, Mandatory, MsgSeqNo, - Message, - State#ch{queue_names = QNames1, - queue_monitors = QMons1}), + State1 = process_routing_confirm( + DeliveredQPids, Confirmed, MsgSeqNo, XName, + process_routing_mandatory( + DeliveredQPids, Mandatory, MsgSeqNo, Message, + State#ch{queue_names = QNames1, + queue_monitors = QMons1})), ?INCR_STATS([{exchange_stats, XName, 1} | [{queue_exchange_stats, {QName, XName}, 1} || QPid <- DeliveredQPids, @@ -1555,22 +1559,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ %% TODO unbreak basic.return stats -process_routing_result(_, _, _, undefined, _Msg, State) -> +process_routing_mandatory(_, false, _MsgSeqNo, _Msg, State) -> State; -process_routing_result([], XName, false, MsgSeqNo, _Msg, State) -> - record_confirms([{MsgSeqNo, XName}], State); -process_routing_result([], XName, true, MsgSeqNo, Msg, State) -> +process_routing_mandatory([], true, _MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), + State; +process_routing_mandatory(QPids, true, MsgSeqNo, Msg, State) -> + State#ch{mandatory = dtree:insert( + MsgSeqNo, QPids, Msg, State#ch.mandatory)}. + +process_routing_confirm(_, false, _MsgSeqNo, _XName, State) -> + State; +process_routing_confirm([], true, MsgSeqNo, XName, State) -> + exit(bang), record_confirms([{MsgSeqNo, XName}], State); -process_routing_result(QPids, XName, Mandatory, MsgSeqNo, Msg, State) -> - MandatoryTree = case Mandatory of - false -> State#ch.mandatory; - true -> dtree:insert(MsgSeqNo, QPids, Msg, - State#ch.mandatory) - end, +process_routing_confirm(QPids, true, MsgSeqNo, XName, State) -> + exit(bang), State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, - State#ch.unconfirmed), - mandatory = MandatoryTree}. + State#ch.unconfirmed)}. %% process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> %% ok = basic_return(Msg, State, no_route), |