From ddf7763729950ad71d782818b3bb105f95e458e9 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 17 Sep 2012 14:15:11 +0100 Subject: nuke 'immediate' --- include/rabbit.hrl | 2 +- src/rabbit_amqqueue.erl | 42 +++++++++++++------------------- src/rabbit_amqqueue_process.erl | 28 ++++----------------- src/rabbit_basic.erl | 32 ++++++++++++------------ src/rabbit_channel.erl | 21 +++++++--------- src/rabbit_error_logger.erl | 2 +- src/rabbit_mirror_queue_slave.erl | 51 ++++++++++++--------------------------- src/rabbit_tests.erl | 5 ++-- src/rabbit_types.erl | 1 - 9 files changed, 66 insertions(+), 118 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d6fac46d..fff92205 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -73,7 +73,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, sender, message, msg_seq_no}). +-record(delivery, {mandatory, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d566ac87..f80559ba 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -60,7 +60,7 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). +-type(routing_result() :: 'routed' | 'unroutable'). -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). -spec(start/0 :: () -> [name()]). @@ -645,18 +645,17 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. -deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> +deliver([], #delivery{mandatory = false}, _Flow) -> %% /dev/null optimisation {routed, []}; -deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. +deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> + %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver + %% will deliver the message to the queue process asynchronously, + %% and return true, which means all the QPids will always be + %% returned. It is therefore safe to use a fire-and-forget cast + %% here and return the QPids - the semantics is preserved. This + %% scales much better than the case below. QPids = qpids(Qs), case Flow of flow -> [credit_flow:send(QPid) || QPid <- QPids]; @@ -668,21 +667,14 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> end), {routed, QPids}; -deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}, - _Flow) -> - QPids = qpids(Qs), - {Success, _} = - delegate:invoke( - QPids, fun (QPid) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity) - end), - case {Mandatory, Immediate, - lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; - ({_, false}, {_, H}) -> {true, H} - end, {false, []}, Success)} of - {true, _ , {false, []}} -> {unroutable, []}; - {_ , true, {_ , []}} -> {not_delivered, []}; - {_ , _ , {_ , R}} -> {routed, R} +deliver(Qs, Delivery, _Flow) -> + case delegate:invoke( + qpids(Qs), fun (QPid) -> + ok = gen_server2:call(QPid, {deliver, Delivery}, + infinity) + end) of + {[], _} -> {unroutable, []}; + {R , _} -> {routed, [QPid || {QPid, ok} <- R]} end. qpids(Qs) -> lists:append([[QPid | SPids] || diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 20ba4574..d6a5523a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -770,7 +770,7 @@ dead_letter_fun(Reason, _State) -> dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) -> DLMsg = make_dead_letter_msg(Reason, Msg, State), - Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), {Queues, Cycles} = detect_dead_letter_cycles( DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), @@ -1032,27 +1032,9 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> - %% FIXME: Is this correct semantics? - %% - %% I'm worried in particular about the case where an exchange has - %% two queues against a particular routing key, and a message is - %% sent in immediate mode through the binding. In non-immediate - %% mode, both queues get the message, saving it for later if - %% there's noone ready to receive it just now. In immediate mode, - %% should both queues still get the message, somehow, or should - %% just all ready-to-consume queues get the message, with unready - %% queues discarding the message? - %% - Confirm = should_confirm_message(Delivery, State), - {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), - reply(Delivered, case Delivered of - true -> maybe_record_confirm_message(Confirm, State1); - false -> discard_delivery(Delivery, State1) - end); - -handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> - gen_server2:reply(From, true), +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" deliver mode. + gen_server2:reply(From, ok), noreply(deliver_or_enqueue(Delivery, State)); handle_call({notify_down, ChPid}, From, State) -> @@ -1198,7 +1180,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State = #q{senders = Senders}) -> - %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + %% Asynchronous, non-"mandatory" deliver mode. Senders1 = case Flow of flow -> credit_flow:ack(Sender), pmon:monitor(Sender, Senders); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 734456d3..db2b7e95 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,9 +18,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/4, publish/6, publish/1, +-export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, append_table_header/3, - extract_headers/1, map_headers/2, delivery/4, header_routes/1]). + extract_headers/1, map_headers/2, delivery/3, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -40,13 +40,13 @@ -spec(publish/4 :: (exchange_input(), rabbit_router:routing_key(), properties_input(), body_input()) -> publish_result()). --spec(publish/6 :: - (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), +-spec(publish/5 :: + (exchange_input(), rabbit_router:routing_key(), boolean(), properties_input(), body_input()) -> publish_result()). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: - (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> +-spec(delivery/3 :: + (boolean(), rabbit_types:message(), undefined | integer()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), @@ -80,18 +80,16 @@ %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. publish(Exchange, RoutingKeyBin, Properties, Body) -> - publish(Exchange, RoutingKeyBin, false, false, Properties, Body). + publish(Exchange, RoutingKeyBin, false, Properties, Body). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> - publish(X, delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)); -publish(XName, RKey, Mandatory, Immediate, Props, Body) -> - publish(delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)). +publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(X, delivery(Mandatory, Message, undefined)); +publish(XName, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(delivery(Mandatory, Message, undefined)). publish(Delivery = #delivery{ message = #basic_message{exchange_name = XName}}) -> @@ -105,8 +103,8 @@ publish(X, Delivery) -> {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), {ok, RoutingRes, DeliveredQPids}. -delivery(Mandatory, Immediate, Message, MsgSeqNo) -> - #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(), +delivery(Mandatory, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e50e823c..e8f3aab3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -598,10 +598,12 @@ handle_method(_Method, _, #ch{tx_status = TxStatus}) handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; +handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> + rabbit_misc:protocol_error(not_implemented, "immediate=true", []); + handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, - mandatory = Mandatory, - immediate = Immediate}, + mandatory = Mandatory}, Content, State = #ch{virtual_host = VHostPath, tx_status = TxStatus, confirm_enabled = ConfirmEnabled, @@ -623,8 +625,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_trace_in(Message, TraceState), - Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message, - MsgSeqNo), + Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), {noreply, case TxStatus of @@ -1342,20 +1343,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QPid <- DeliveredQPids]], publish, State2), State2. -process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], return_unroutable, State), record_confirm(MsgSeqNo, XName, State); -process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State, no_consumers), - maybe_incr_stats([{XName, 1}], return_not_delivered, State), - record_confirm(MsgSeqNo, XName, State); -process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); -process_routing_result(routed, _, _, undefined, _, State) -> +process_routing_result(routed, _, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> +process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, State#ch.unconfirmed)}. diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index f1672f4e..a9af2d8a 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -81,7 +81,7 @@ publish1(RoutingKey, Format, Data, LogExch) -> %% second resolution, not millisecond. Timestamp = rabbit_misc:now_ms() div 1000, {ok, _RoutingRes, _DeliveredQPids} = - rabbit_basic:publish(LogExch, RoutingKey, false, false, + rabbit_basic:publish(LogExch, RoutingKey, #'P_basic'{content_type = <<"text/plain">>, timestamp = Timestamp}, list_to_binary(io_lib:format(Format, Data))), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3e45f026..1f6567e0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -70,7 +70,7 @@ sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId} + sender_queues, %% :: Pid -> {Q Msg, Set MsgId} msg_id_ack, %% :: MsgId -> AckTag ack_num, @@ -167,27 +167,10 @@ init_it(Self, Node, QueueName) -> end end. -handle_call({deliver, Delivery = #delivery { immediate = true }}, - From, State) -> - %% It is safe to reply 'false' here even if a) we've not seen the - %% msg via gm, or b) the master dies before we receive the msg via - %% gm. In the case of (a), we will eventually receive the msg via - %% gm, and it's only the master's result to the channel that is - %% important. In the case of (b), if the master does die and we do - %% get promoted then at that point we have no consumers, thus - %% 'false' is precisely the correct answer. However, we must be - %% careful to _not_ enqueue the message in this case. - - %% Note this is distinct from the case where we receive the msg - %% via gm first, then we're promoted to master, and only then do - %% we receive the msg from the channel. - gen_server2:reply(From, false), %% master may deliver it, not us - noreply(maybe_enqueue_message(Delivery, false, State)); - -handle_call({deliver, Delivery = #delivery { mandatory = true }}, - From, State) -> - gen_server2:reply(From, true), %% amqqueue throws away the result anyway - noreply(maybe_enqueue_message(Delivery, true, State)); +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" deliver mode. + gen_server2:reply(From, ok), + noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, @@ -232,12 +215,12 @@ handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> - %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + %% Asynchronous, non-"mandatory", deliver mode. case Flow of flow -> credit_flow:ack(Sender); noflow -> ok end, - noreply(maybe_enqueue_message(Delivery, true, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -554,7 +537,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), - {Delivery, true} <- queue:to_list(PubQ)], + Delivery <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, Deliveries, KS, MTC), @@ -655,14 +638,13 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, msg_seq_no = MsgSeqNo, sender = ChPid }, - EnqueueOnPromotion, State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> {MQ, PendingCh} = get_sender_queue(ChPid, SQ), - MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ), + MQ1 = queue:in(Delivery, MQ), SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; {ok, {confirmed, ChPid}} -> @@ -732,10 +714,9 @@ process_instruction( {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), dict:store(MsgId, {published, ChPid}, MS)}; - {{value, {Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> + {{value, Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, %% We received the msg from the channel first. Thus %% we need to deal with confirms here. @@ -747,7 +728,7 @@ process_instruction( ChPid, [MsgSeqNo]), MS end}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} %% record. We'll never receive the message directly @@ -784,12 +765,12 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {empty, _MQ} -> {MQ, sets:add_element(MsgId, PendingCh), dict:store(MsgId, discarded, MS)}; - {{value, {#delivery { message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> + {{value, #delivery { message = #basic_message { id = MsgId } }}, + MQ2} -> %% We've already seen it from the channel, we're not %% going to see this again, so don't add it to MS {MQ2, PendingCh, MS}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} %% record. We'll never receive the message directly diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3cc0e5db..08535e7d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -656,7 +656,6 @@ test_topic_expect_match(X, List) -> #'P_basic'{}, <<>>), Res = rabbit_exchange_type_topic:route( X, #delivery{mandatory = false, - immediate = false, sender = self(), message = Message}), ExpectedRes = lists:map( @@ -2194,8 +2193,8 @@ publish_and_confirm(Q, Payload, Count) -> Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{delivery_mode = 2}, Payload), - Delivery = #delivery{mandatory = false, immediate = false, - sender = self(), message = Msg, msg_seq_no = Seq}, + Delivery = #delivery{mandatory = false, sender = self(), + message = Msg, msg_seq_no = Seq}, {routed, _} = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 8966bcab..f488afb4 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -69,7 +69,6 @@ -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), - immediate :: boolean(), sender :: pid(), message :: message()}). -type(message_properties() :: -- cgit v1.2.1