summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-22 11:55:35 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-22 11:55:35 +0000
commit3a29d0d519589d8aba1011d85ae36e93e75a2989 (patch)
tree0f26cd77ee141e4002ffaa239dc38c540e743c1a
parent860e1265688f1d56150b6a980e3460829ae4751e (diff)
downloadrabbitmq-server-3a29d0d519589d8aba1011d85ae36e93e75a2989.tar.gz
Substantially improve both performance and correctness: 1) really don't treat presence of MsgSeqNo as a cue we are doing confirms, and 2) get the right dtree function when we receive a MsgSeqNo.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/dtree.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_basic.erl14
-rw-r--r--src/rabbit_channel.erl52
5 files changed, 49 insertions, 35 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 8b42cdea..ba52a407 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -70,7 +70,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, sender, message, msg_seq_no}).
+-record(delivery, {mandatory, confirmed, sender, message, msg_seq_no}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
diff --git a/src/dtree.erl b/src/dtree.erl
index 5ff36bd9..f39d8e3a 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -32,7 +32,7 @@
-module(dtree).
--export([empty/0, insert/4, take/3, take/2, take_all/2,
+-export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -53,6 +53,7 @@
-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(drop/2 :: (pk(), ?MODULE()) -> ?MODULE()).
-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-spec(smallest/1 :: (?MODULE()) -> kv()).
@@ -120,6 +121,11 @@ take_all(SK, {P, S}) ->
{KVs, {P1, prune(SKS, PKS, S)}}
end.
+%% Drop all entries for the given primary key.
+drop(PK, {P, S}) ->
+ SKS = gb_trees:get(PK, P),
+ {gb_trees:delete(PK, P), prune(SKS, gb_sets:singleton(PK), S)}.
+
is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
is_empty({P, _S}) -> gb_trees:is_empty(P).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index df9748fb..23fbf93f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -425,9 +425,10 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
+send_or_record_confirm(#delivery{confirmed = false}, State) ->
{never, State};
-send_or_record_confirm(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{confirmed = true,
+ sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
@@ -436,7 +437,8 @@ send_or_record_confirm(#delivery{sender = SenderPid,
msg_id_to_channel = MTC}) ->
MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
{eventually, State#q{msg_id_to_channel = MTC1}};
-send_or_record_confirm(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{confirmed = true,
+ sender = SenderPid,
msg_seq_no = MsgSeqNo}, State) ->
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 3e944867..454755e6 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,7 +20,7 @@
-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, prepend_table_header/3,
- extract_headers/1, map_headers/2, delivery/3, header_routes/1,
+ extract_headers/1, map_headers/2, delivery/4, header_routes/1,
parse_expiration/1]).
-export([build_content/2, from_content/1, msg_size/1]).
@@ -46,8 +46,8 @@
properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/3 ::
- (boolean(), rabbit_types:message(), undefined | integer()) ->
+-spec(delivery/4 ::
+ (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
@@ -93,10 +93,10 @@ publish(Exchange, RoutingKeyBin, Properties, Body) ->
%% erlang distributed network.
publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
- publish(X, delivery(Mandatory, Message, undefined));
+ publish(X, delivery(Mandatory, false, Message, undefined));
publish(XName, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
- publish(delivery(Mandatory, Message, undefined)).
+ publish(delivery(Mandatory, false, Message, undefined)).
publish(Delivery = #delivery{
message = #basic_message{exchange_name = XName}}) ->
@@ -110,8 +110,8 @@ publish(X, Delivery) ->
DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, DeliveredQPids}.
-delivery(Mandatory, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, sender = self(),
+delivery(Mandatory, Confirmed, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, confirmed = Confirmed, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b862766a..346dd2e1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -633,7 +633,7 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_mandatory(MsgSeqNo, State = #ch{mandatory = Mand}) ->
- {_MMsgs, Mand1} = dtree:take_all([MsgSeqNo], Mand),
+ Mand1 = dtree:drop(MsgSeqNo, Mand),
State#ch{mandatory = Mand1}.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -708,16 +708,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(Props, State),
check_expiration_header(Props),
+ DoConfirm = Tx =/= none orelse ConfirmEnabled,
{MsgSeqNo, State1} =
- case {Tx, ConfirmEnabled orelse Mandatory} of
- {none, false} -> {undefined, State};
- {_, _} -> SeqNo = State#ch.publish_seqno,
- {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ case DoConfirm orelse Mandatory of
+ false -> {undefined, State};
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_in(Message, TraceState),
- Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(
+ Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
DQ = {Delivery, QNames},
{noreply, case Tx of
@@ -1511,7 +1513,6 @@ notify_limiter(Limiter, Acked) ->
end.
deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
- msg_seq_no = undefined,
mandatory = false},
[]}, State) -> %% optimisation
?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
@@ -1519,6 +1520,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
mandatory = Mandatory,
+ confirmed = Confirmed,
msg_seq_no = MsgSeqNo},
DelQNames}, State = #ch{queue_names = QNames,
queue_monitors = QMons}) ->
@@ -1542,10 +1544,12 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
false -> dict:store(QPid, QName, QNames0)
end, pmon:monitor(QPid, QMons0)}
end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
- State1 = process_routing_result(DeliveredQPids, XName, Mandatory, MsgSeqNo,
- Message,
- State#ch{queue_names = QNames1,
- queue_monitors = QMons1}),
+ State1 = process_routing_confirm(
+ DeliveredQPids, Confirmed, MsgSeqNo, XName,
+ process_routing_mandatory(
+ DeliveredQPids, Mandatory, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1})),
?INCR_STATS([{exchange_stats, XName, 1} |
[{queue_exchange_stats, {QName, XName}, 1} ||
QPid <- DeliveredQPids,
@@ -1555,22 +1559,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
%% TODO unbreak basic.return stats
-process_routing_result(_, _, _, undefined, _Msg, State) ->
+process_routing_mandatory(_, false, _MsgSeqNo, _Msg, State) ->
State;
-process_routing_result([], XName, false, MsgSeqNo, _Msg, State) ->
- record_confirms([{MsgSeqNo, XName}], State);
-process_routing_result([], XName, true, MsgSeqNo, Msg, State) ->
+process_routing_mandatory([], true, _MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
+ State;
+process_routing_mandatory(QPids, true, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(
+ MsgSeqNo, QPids, Msg, State#ch.mandatory)}.
+
+process_routing_confirm(_, false, _MsgSeqNo, _XName, State) ->
+ State;
+process_routing_confirm([], true, MsgSeqNo, XName, State) ->
+ exit(bang),
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_result(QPids, XName, Mandatory, MsgSeqNo, Msg, State) ->
- MandatoryTree = case Mandatory of
- false -> State#ch.mandatory;
- true -> dtree:insert(MsgSeqNo, QPids, Msg,
- State#ch.mandatory)
- end,
+process_routing_confirm(QPids, true, MsgSeqNo, XName, State) ->
+ exit(bang),
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed),
- mandatory = MandatoryTree}.
+ State#ch.unconfirmed)}.
%% process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
%% ok = basic_return(Msg, State, no_route),