diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-23 17:11:10 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-23 17:11:10 +0000 |
commit | 55b5f4c07614ea84f5f4499eb1417cb6844e0e14 (patch) | |
tree | 76af9679b475bfa52c5a5c4e32c22a01c0a5d857 | |
parent | 660df83677206e69ab5a4e4936531345110f9765 (diff) | |
parent | 12159aa2d3f5d18986e6550b0be8240d4588aaab (diff) | |
download | rabbitmq-server-55b5f4c07614ea84f5f4499eb1417cb6844e0e14.tar.gz |
merge bug24408 into default
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/dtree.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 35 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 22 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 21 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 101 | ||||
-rw-r--r-- | src/rabbit_dead_letter.erl | 2 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 6 |
10 files changed, 109 insertions, 97 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 8b42cdea..6d117e3d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -70,7 +70,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, sender, message, msg_seq_no}). +-record(delivery, {mandatory, confirm, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). diff --git a/src/dtree.erl b/src/dtree.erl index 5ff36bd9..72abe248 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -32,7 +32,7 @@ -module(dtree). --export([empty/0, insert/4, take/3, take/2, take_all/2, +-export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2, is_defined/2, is_empty/1, smallest/1, size/1]). %%---------------------------------------------------------------------------- @@ -53,6 +53,7 @@ -spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(drop/2 :: (pk(), ?MODULE()) -> ?MODULE()). -spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -spec(smallest/1 :: (?MODULE()) -> kv()). @@ -120,6 +121,14 @@ take_all(SK, {P, S}) -> {KVs, {P1, prune(SKS, PKS, S)}} end. +%% Drop all entries for the given primary key (which does not have to exist). +drop(PK, {P, S}) -> + case gb_trees:lookup(PK, P) of + none -> {P, S}; + {value, {SKS, _V}} -> {gb_trees:delete(PK, P), + prune(SKS, gb_sets:singleton(PK), S)} + end. + is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). is_empty({P, _S}) -> gb_trees:is_empty(P). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 773ad60d..b8863aca 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -51,7 +51,7 @@ -ifdef(use_specs). --export_type([name/0, qmsg/0, routing_result/0]). +-export_type([name/0, qmsg/0]). -type(name() :: rabbit_types:r('queue')). -type(qpids() :: [pid()]). @@ -61,7 +61,6 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(routing_result() :: 'routed' | 'unroutable'). -type(queue_or_absent() :: rabbit_types:amqqueue() | {'absent', rabbit_types:amqqueue()}). -type(not_found_or_absent() :: 'not_found' | @@ -138,9 +137,9 @@ -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> - {routing_result(), qpids()}). + qpids()). -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> - {routing_result(), qpids()}). + qpids()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). @@ -698,17 +697,11 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. -deliver([], #delivery{mandatory = false}, _Flow) -> +deliver([], _Delivery, _Flow) -> %% /dev/null optimisation - {routed, []}; - -deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> - %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver - %% will deliver the message to the queue process asynchronously, - %% and return true, which means all the QPids will always be - %% returned. It is therefore safe to use a fire-and-forget cast - %% here and return the QPids - the semantics is preserved. This - %% scales much better than the case below. + []; + +deliver(Qs, Delivery, Flow) -> {MPids, SPids} = qpids(Qs), QPids = MPids ++ SPids, case Flow of @@ -725,19 +718,7 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> SMsg = {deliver, Delivery, true, Flow}, delegate:cast(MPids, MMsg), delegate:cast(SPids, SMsg), - {routed, QPids}; - -deliver(Qs, Delivery, _Flow) -> - {MPids, SPids} = qpids(Qs), - %% see comment above - MMsg = {deliver, Delivery, false}, - SMsg = {deliver, Delivery, true}, - {MRouted, _} = delegate:call(MPids, MMsg), - {SRouted, _} = delegate:call(SPids, SMsg), - case MRouted ++ SRouted of - [] -> {unroutable, []}; - R -> {routed, [QPid || {QPid, ok} <- R]} - end. + QPids. qpids([]) -> {[], []}; %% optimisation qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3322b3d1..2b0882b8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -425,9 +425,10 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. -send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) -> +send_or_record_confirm(#delivery{confirm = false}, State) -> {never, State}; -send_or_record_confirm(#delivery{sender = SenderPid, +send_or_record_confirm(#delivery{confirm = true, + sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, @@ -436,11 +437,19 @@ send_or_record_confirm(#delivery{sender = SenderPid, msg_id_to_channel = MTC}) -> MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), {eventually, State#q{msg_id_to_channel = MTC1}}; -send_or_record_confirm(#delivery{sender = SenderPid, +send_or_record_confirm(#delivery{confirm = true, + sender = SenderPid, msg_seq_no = MsgSeqNo}, State) -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. +send_mandatory(#delivery{mandatory = false}) -> + ok; +send_mandatory(#delivery{mandatory = true, + sender = SenderPid, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). + discard(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message{id = MsgId}}, State) -> @@ -496,6 +505,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + send_mandatory(Delivery), %% must do this before confirms {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), @@ -884,11 +894,6 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State = #q{consumers = Consumers}) -> reply(rabbit_queue_consumers:all(Consumers), State); -handle_call({deliver, Delivery, Delivered}, From, State) -> - %% Synchronous, "mandatory" deliver mode. - gen_server2:reply(From, ok), - noreply(deliver_or_enqueue(Delivery, Delivered, State)); - handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the @@ -1040,7 +1045,6 @@ handle_cast({run_backing_queue, Mod, Fun}, handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> - %% Asynchronous, non-"mandatory" deliver mode. Senders1 = case Flow of flow -> credit_flow:ack(Sender), pmon:monitor(Sender, Senders); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 3d70be4b..a5dc6eb2 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,7 +20,7 @@ -export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, prepend_table_header/3, - extract_headers/1, map_headers/2, delivery/3, header_routes/1, + extract_headers/1, map_headers/2, delivery/4, header_routes/1, parse_expiration/1]). -export([build_content/2, from_content/1, msg_size/1]). @@ -31,8 +31,7 @@ -type(properties_input() :: (rabbit_framing:amqp_property_record() | [{atom(), any()}])). -type(publish_result() :: - ({ok, rabbit_amqqueue:routing_result(), [pid()]} - | rabbit_types:error('not_found'))). + ({ok, [pid()]} | rabbit_types:error('not_found'))). -type(headers() :: rabbit_framing:amqp_table() | 'undefined'). -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). @@ -46,8 +45,8 @@ properties_input(), body_input()) -> publish_result()). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/3 :: - (boolean(), rabbit_types:message(), undefined | integer()) -> +-spec(delivery/4 :: + (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), @@ -93,10 +92,10 @@ publish(Exchange, RoutingKeyBin, Properties, Body) -> %% erlang distributed network. publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> Message = message(XName, RKey, properties(Props), Body), - publish(X, delivery(Mandatory, Message, undefined)); + publish(X, delivery(Mandatory, false, Message, undefined)); publish(XName, RKey, Mandatory, Props, Body) -> Message = message(XName, RKey, properties(Props), Body), - publish(delivery(Mandatory, Message, undefined)). + publish(delivery(Mandatory, false, Message, undefined)). publish(Delivery = #delivery{ message = #basic_message{exchange_name = XName}}) -> @@ -107,11 +106,11 @@ publish(Delivery = #delivery{ publish(X, Delivery) -> Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)), - {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), - {ok, RoutingRes, DeliveredQPids}. + DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), + {ok, DeliveredQPids}. -delivery(Mandatory, Message, MsgSeqNo) -> - #delivery{mandatory = Mandatory, sender = self(), +delivery(Mandatory, Confirm, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 09798266..2079e8e7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ queue_names, queue_monitors, consumer_mapping, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, - unconfirmed, confirmed, capabilities, trace_state}). + unconfirmed, confirmed, mandatory, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -214,6 +214,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, publish_seqno = 1, unconfirmed = dtree:empty(), confirmed = [], + mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), @@ -232,8 +233,9 @@ prioritise_call(Msg, _From, _Len, _State) -> prioritise_cast(Msg, _Len, _State) -> case Msg of - {confirm, _MsgSeqNos, _QPid} -> 5; - _ -> 0 + {confirm, _MsgSeqNos, _QPid} -> 5; + {mandatory_received, _MsgSeqNo, _QPid} -> 5; + _ -> 0 end. prioritise_info(Msg, _Len, _State) -> @@ -336,11 +338,14 @@ handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); -handle_cast({confirm, MsgSeqNos, From}, State) -> - State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), - Timeout = case C of [] -> hibernate; _ -> 0 end, +handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> + %% NB: don't call noreply/1 since we don't want to send confirms. + noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)}); + +handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), %% NB: don't call noreply/1 since we don't want to send confirms. - {noreply, ensure_stats_timer(State1), Timeout}. + noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})). handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), @@ -405,6 +410,10 @@ noreply(NewState) -> {noreply, next_state(NewState), hibernate}. next_state(State) -> ensure_stats_timer(send_confirms(State)). +noreply_coalesce(State = #ch{confirmed = C}) -> + Timeout = case C of [] -> hibernate; _ -> 0 end, + {noreply, ensure_stats_timer(State), Timeout}. + ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats). @@ -592,12 +601,6 @@ record_confirms([], State) -> record_confirms(MXs, State = #ch{confirmed = C}) -> State#ch{confirmed = [MXs | C]}. -confirm([], _QPid, State) -> - State; -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), - record_confirms(MXs, State#ch{unconfirmed = UC1}). - handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? State1 = State#ch{state = running}, @@ -670,16 +673,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(Props, State), check_expiration_header(Props), + DoConfirm = Tx =/= none orelse ConfirmEnabled, {MsgSeqNo, State1} = - case {Tx, ConfirmEnabled} of - {none, false} -> {undefined, State}; - {_, _} -> SeqNo = State#ch.publish_seqno, - {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + case DoConfirm orelse Mandatory of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_in(Message, TraceState), - Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), + Delivery = rabbit_basic:delivery( + Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), DQ = {Delivery, QNames}, {noreply, case Tx of @@ -1204,12 +1209,17 @@ monitor_delivering_queue(NoAck, QPid, QName, false -> sets:add_element(QPid, DQ) end}. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, + mandatory = Mand}) -> + {MMsgs, Mand1} = dtree:take(QPid, Mand), + [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs], + State1 = State#ch{mandatory = Mand1}, case rabbit_misc:is_abnormal_exit(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), - send_nacks(MXs, State#ch{unconfirmed = UC1}); + send_nacks(MXs, State1#ch{unconfirmed = UC1}); false -> {MXs, UC1} = dtree:take(QPid, UC), - record_confirms(MXs, State#ch{unconfirmed = UC1}) + record_confirms(MXs, State1#ch{unconfirmed = UC1}) + end. handle_consuming_queue_down(QPid, @@ -1277,7 +1287,9 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, basic_return(#basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}, - #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> + State = #ch{protocol = Protocol, writer_pid = WriterPid}, + Reason) -> + ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State), {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, @@ -1427,18 +1439,19 @@ notify_limiter(Limiter, Acked) -> end. deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, - msg_seq_no = undefined, mandatory = false}, []}, State) -> %% optimisation ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), State; deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, + mandatory = Mandatory, + confirm = Confirm, msg_seq_no = MsgSeqNo}, DelQNames}, State = #ch{queue_names = QNames, queue_monitors = QMons}) -> Qs = rabbit_amqqueue:lookup(DelQNames), - {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery), + DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery), %% The pmon:monitor_all/2 monitors all queues to which we %% delivered. But we want to monitor even queues we didn't deliver %% to, since we need their 'DOWN' messages to clean @@ -1457,31 +1470,37 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ false -> dict:store(QPid, QName, QNames0) end, pmon:monitor(QPid, QMons0)} end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), - State1 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, - State#ch{queue_names = QNames1, - queue_monitors = QMons1}), + State1 = State#ch{queue_names = QNames1, + queue_monitors = QMons1}, + %% NB: the order here is important since basic.returns must be + %% sent before confirms. + State2 = process_routing_mandatory(Mandatory, DeliveredQPids, MsgSeqNo, + Message, State1), + State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo, + XName, State2), ?INCR_STATS([{exchange_stats, XName, 1} | [{queue_exchange_stats, {QName, XName}, 1} || QPid <- DeliveredQPids, {ok, QName} <- [dict:find(QPid, QNames1)]]], - publish, State1), - State1. + publish, State3), + State3. + +process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> + State; +process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State, no_route), + State; +process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) -> + State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg, + State#ch.mandatory)}. -process_routing_result(routed, _, _, undefined, _, State) -> +process_routing_confirm(false, _, _MsgSeqNo, _XName, State) -> State; -process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> +process_routing_confirm(true, [], MsgSeqNo, XName, State) -> record_confirms([{MsgSeqNo, XName}], State); -process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> +process_routing_confirm(true, QPids, MsgSeqNo, XName, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, - State#ch.unconfirmed)}; -process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State, no_route), - ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State), - case MsgSeqNo of - undefined -> State; - _ -> record_confirms([{MsgSeqNo, XName}], State) - end. + State#ch.unconfirmed)}. send_nacks([], State) -> State; diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 640b282e..b8a2cc9c 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -34,7 +34,7 @@ publish(Msg, Reason, X, RK, QName) -> DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, undefined), + Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), {Queues, Cycles} = detect_cycles(Reason, DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index ab8c62fe..447cd893 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -87,7 +87,7 @@ publish1(RoutingKey, Format, Data, LogExch) -> %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's %% second resolution, not millisecond. Timestamp = rabbit_misc:now_ms() div 1000, - {ok, _RoutingRes, _DeliveredQPids} = + {ok, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, #'P_basic'{content_type = <<"text/plain">>, timestamp = Timestamp}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 731351f0..f267467e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2408,8 +2408,8 @@ publish_and_confirm(Q, Payload, Count) -> <<>>, #'P_basic'{delivery_mode = 2}, Payload), Delivery = #delivery{mandatory = false, sender = self(), - message = Msg, msg_seq_no = Seq}, - {routed, _} = rabbit_amqqueue:deliver([Q], Delivery) + confirm = true, message = Msg, msg_seq_no = Seq}, + _QPids = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index d0dcaa71..b08a9a1c 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -88,9 +88,9 @@ trace(#exchange{name = Name}, #basic_message{exchange_name = Name}, ok; trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}}, RKPrefix, RKSuffix, Extra) -> - {ok, _, _} = rabbit_basic:publish( - X, <<RKPrefix/binary, ".", RKSuffix/binary>>, - #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), + {ok, _} = rabbit_basic:publish( + X, <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), ok. msg_to_table(#basic_message{exchange_name = #resource{name = XName}, |