summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-23 17:11:10 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-23 17:11:10 +0000
commit55b5f4c07614ea84f5f4499eb1417cb6844e0e14 (patch)
tree76af9679b475bfa52c5a5c4e32c22a01c0a5d857
parent660df83677206e69ab5a4e4936531345110f9765 (diff)
parent12159aa2d3f5d18986e6550b0be8240d4588aaab (diff)
downloadrabbitmq-server-55b5f4c07614ea84f5f4499eb1417cb6844e0e14.tar.gz
merge bug24408 into default
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/dtree.erl11
-rw-r--r--src/rabbit_amqqueue.erl35
-rw-r--r--src/rabbit_amqqueue_process.erl22
-rw-r--r--src/rabbit_basic.erl21
-rw-r--r--src/rabbit_channel.erl101
-rw-r--r--src/rabbit_dead_letter.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_trace.erl6
10 files changed, 109 insertions, 97 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 8b42cdea..6d117e3d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -70,7 +70,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, sender, message, msg_seq_no}).
+-record(delivery, {mandatory, confirm, sender, message, msg_seq_no}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
diff --git a/src/dtree.erl b/src/dtree.erl
index 5ff36bd9..72abe248 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -32,7 +32,7 @@
-module(dtree).
--export([empty/0, insert/4, take/3, take/2, take_all/2,
+-export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -53,6 +53,7 @@
-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(drop/2 :: (pk(), ?MODULE()) -> ?MODULE()).
-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-spec(smallest/1 :: (?MODULE()) -> kv()).
@@ -120,6 +121,14 @@ take_all(SK, {P, S}) ->
{KVs, {P1, prune(SKS, PKS, S)}}
end.
+%% Drop all entries for the given primary key (which does not have to exist).
+drop(PK, {P, S}) ->
+ case gb_trees:lookup(PK, P) of
+ none -> {P, S};
+ {value, {SKS, _V}} -> {gb_trees:delete(PK, P),
+ prune(SKS, gb_sets:singleton(PK), S)}
+ end.
+
is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
is_empty({P, _S}) -> gb_trees:is_empty(P).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 773ad60d..b8863aca 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -51,7 +51,7 @@
-ifdef(use_specs).
--export_type([name/0, qmsg/0, routing_result/0]).
+-export_type([name/0, qmsg/0]).
-type(name() :: rabbit_types:r('queue')).
-type(qpids() :: [pid()]).
@@ -61,7 +61,6 @@
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(routing_result() :: 'routed' | 'unroutable').
-type(queue_or_absent() :: rabbit_types:amqqueue() |
{'absent', rabbit_types:amqqueue()}).
-type(not_found_or_absent() :: 'not_found' |
@@ -138,9 +137,9 @@
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
- {routing_result(), qpids()}).
+ qpids()).
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
- {routing_result(), qpids()}).
+ qpids()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -698,17 +697,11 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
-deliver([], #delivery{mandatory = false}, _Flow) ->
+deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
- {routed, []};
-
-deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
- %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver
- %% will deliver the message to the queue process asynchronously,
- %% and return true, which means all the QPids will always be
- %% returned. It is therefore safe to use a fire-and-forget cast
- %% here and return the QPids - the semantics is preserved. This
- %% scales much better than the case below.
+ [];
+
+deliver(Qs, Delivery, Flow) ->
{MPids, SPids} = qpids(Qs),
QPids = MPids ++ SPids,
case Flow of
@@ -725,19 +718,7 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
SMsg = {deliver, Delivery, true, Flow},
delegate:cast(MPids, MMsg),
delegate:cast(SPids, SMsg),
- {routed, QPids};
-
-deliver(Qs, Delivery, _Flow) ->
- {MPids, SPids} = qpids(Qs),
- %% see comment above
- MMsg = {deliver, Delivery, false},
- SMsg = {deliver, Delivery, true},
- {MRouted, _} = delegate:call(MPids, MMsg),
- {SRouted, _} = delegate:call(SPids, SMsg),
- case MRouted ++ SRouted of
- [] -> {unroutable, []};
- R -> {routed, [QPid || {QPid, ok} <- R]}
- end.
+ QPids.
qpids([]) -> {[], []}; %% optimisation
qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3322b3d1..2b0882b8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -425,9 +425,10 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
+send_or_record_confirm(#delivery{confirm = false}, State) ->
{never, State};
-send_or_record_confirm(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{confirm = true,
+ sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
@@ -436,11 +437,19 @@ send_or_record_confirm(#delivery{sender = SenderPid,
msg_id_to_channel = MTC}) ->
MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
{eventually, State#q{msg_id_to_channel = MTC1}};
-send_or_record_confirm(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{confirm = true,
+ sender = SenderPid,
msg_seq_no = MsgSeqNo}, State) ->
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
+send_mandatory(#delivery{mandatory = false}) ->
+ ok;
+send_mandatory(#delivery{mandatory = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
+
discard(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message{id = MsgId}}, State) ->
@@ -496,6 +505,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
@@ -884,11 +894,6 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State = #q{consumers = Consumers}) ->
reply(rabbit_queue_consumers:all(Consumers), State);
-handle_call({deliver, Delivery, Delivered}, From, State) ->
- %% Synchronous, "mandatory" deliver mode.
- gen_server2:reply(From, ok),
- noreply(deliver_or_enqueue(Delivery, Delivered, State));
-
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
@@ -1040,7 +1045,6 @@ handle_cast({run_backing_queue, Mod, Fun},
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
- %% Asynchronous, non-"mandatory" deliver mode.
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
pmon:monitor(Sender, Senders);
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 3d70be4b..a5dc6eb2 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,7 +20,7 @@
-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, prepend_table_header/3,
- extract_headers/1, map_headers/2, delivery/3, header_routes/1,
+ extract_headers/1, map_headers/2, delivery/4, header_routes/1,
parse_expiration/1]).
-export([build_content/2, from_content/1, msg_size/1]).
@@ -31,8 +31,7 @@
-type(properties_input() ::
(rabbit_framing:amqp_property_record() | [{atom(), any()}])).
-type(publish_result() ::
- ({ok, rabbit_amqqueue:routing_result(), [pid()]}
- | rabbit_types:error('not_found'))).
+ ({ok, [pid()]} | rabbit_types:error('not_found'))).
-type(headers() :: rabbit_framing:amqp_table() | 'undefined').
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
@@ -46,8 +45,8 @@
properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/3 ::
- (boolean(), rabbit_types:message(), undefined | integer()) ->
+-spec(delivery/4 ::
+ (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
@@ -93,10 +92,10 @@ publish(Exchange, RoutingKeyBin, Properties, Body) ->
%% erlang distributed network.
publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
- publish(X, delivery(Mandatory, Message, undefined));
+ publish(X, delivery(Mandatory, false, Message, undefined));
publish(XName, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
- publish(delivery(Mandatory, Message, undefined)).
+ publish(delivery(Mandatory, false, Message, undefined)).
publish(Delivery = #delivery{
message = #basic_message{exchange_name = XName}}) ->
@@ -107,11 +106,11 @@ publish(Delivery = #delivery{
publish(X, Delivery) ->
Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
- {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
- {ok, RoutingRes, DeliveredQPids}.
+ DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
+ {ok, DeliveredQPids}.
-delivery(Mandatory, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, sender = self(),
+delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 09798266..2079e8e7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,7 @@
queue_names, queue_monitors, consumer_mapping,
queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
- unconfirmed, confirmed, capabilities, trace_state}).
+ unconfirmed, confirmed, mandatory, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -214,6 +214,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
publish_seqno = 1,
unconfirmed = dtree:empty(),
confirmed = [],
+ mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
@@ -232,8 +233,9 @@ prioritise_call(Msg, _From, _Len, _State) ->
prioritise_cast(Msg, _Len, _State) ->
case Msg of
- {confirm, _MsgSeqNos, _QPid} -> 5;
- _ -> 0
+ {confirm, _MsgSeqNos, _QPid} -> 5;
+ {mandatory_received, _MsgSeqNo, _QPid} -> 5;
+ _ -> 0
end.
prioritise_info(Msg, _Len, _State) ->
@@ -336,11 +338,14 @@ handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
-handle_cast({confirm, MsgSeqNos, From}, State) ->
- State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
- Timeout = case C of [] -> hibernate; _ -> 0 end,
+handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
+
+handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
- {noreply, ensure_stats_timer(State1), Timeout}.
+ noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})).
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
@@ -405,6 +410,10 @@ noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
next_state(State) -> ensure_stats_timer(send_confirms(State)).
+noreply_coalesce(State = #ch{confirmed = C}) ->
+ Timeout = case C of [] -> hibernate; _ -> 0 end,
+ {noreply, ensure_stats_timer(State), Timeout}.
+
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
@@ -592,12 +601,6 @@ record_confirms([], State) ->
record_confirms(MXs, State = #ch{confirmed = C}) ->
State#ch{confirmed = [MXs | C]}.
-confirm([], _QPid, State) ->
- State;
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
- record_confirms(MXs, State#ch{unconfirmed = UC1}).
-
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
%% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
State1 = State#ch{state = running},
@@ -670,16 +673,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(Props, State),
check_expiration_header(Props),
+ DoConfirm = Tx =/= none orelse ConfirmEnabled,
{MsgSeqNo, State1} =
- case {Tx, ConfirmEnabled} of
- {none, false} -> {undefined, State};
- {_, _} -> SeqNo = State#ch.publish_seqno,
- {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ case DoConfirm orelse Mandatory of
+ false -> {undefined, State};
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_in(Message, TraceState),
- Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(
+ Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
DQ = {Delivery, QNames},
{noreply, case Tx of
@@ -1204,12 +1209,17 @@ monitor_delivering_queue(NoAck, QPid, QName,
false -> sets:add_element(QPid, DQ)
end}.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
+ mandatory = Mand}) ->
+ {MMsgs, Mand1} = dtree:take(QPid, Mand),
+ [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs],
+ State1 = State#ch{mandatory = Mand1},
case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- send_nacks(MXs, State#ch{unconfirmed = UC1});
+ send_nacks(MXs, State1#ch{unconfirmed = UC1});
false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State#ch{unconfirmed = UC1})
+ record_confirms(MXs, State1#ch{unconfirmed = UC1})
+
end.
handle_consuming_queue_down(QPid,
@@ -1277,7 +1287,9 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
basic_return(#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content},
- #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) ->
+ State = #ch{protocol = Protocol, writer_pid = WriterPid},
+ Reason) ->
+ ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State),
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
@@ -1427,18 +1439,19 @@ notify_limiter(Limiter, Acked) ->
end.
deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
- msg_seq_no = undefined,
mandatory = false},
[]}, State) -> %% optimisation
?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
+ mandatory = Mandatory,
+ confirm = Confirm,
msg_seq_no = MsgSeqNo},
DelQNames}, State = #ch{queue_names = QNames,
queue_monitors = QMons}) ->
Qs = rabbit_amqqueue:lookup(DelQNames),
- {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery),
%% The pmon:monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
@@ -1457,31 +1470,37 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
false -> dict:store(QPid, QName, QNames0)
end, pmon:monitor(QPid, QMons0)}
end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message,
- State#ch{queue_names = QNames1,
- queue_monitors = QMons1}),
+ State1 = State#ch{queue_names = QNames1,
+ queue_monitors = QMons1},
+ %% NB: the order here is important since basic.returns must be
+ %% sent before confirms.
+ State2 = process_routing_mandatory(Mandatory, DeliveredQPids, MsgSeqNo,
+ Message, State1),
+ State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo,
+ XName, State2),
?INCR_STATS([{exchange_stats, XName, 1} |
[{queue_exchange_stats, {QName, XName}, 1} ||
QPid <- DeliveredQPids,
{ok, QName} <- [dict:find(QPid, QNames1)]]],
- publish, State1),
- State1.
+ publish, State3),
+ State3.
+
+process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
+ State;
+process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State, no_route),
+ State;
+process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg,
+ State#ch.mandatory)}.
-process_routing_result(routed, _, _, undefined, _, State) ->
+process_routing_confirm(false, _, _MsgSeqNo, _XName, State) ->
State;
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
+process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed)};
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_route),
- ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State),
- case MsgSeqNo of
- undefined -> State;
- _ -> record_confirms([{MsgSeqNo, XName}], State)
- end.
+ State#ch.unconfirmed)}.
send_nacks([], State) ->
State;
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 640b282e..b8a2cc9c 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -34,7 +34,7 @@
publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined),
{Queues, Cycles} = detect_cycles(Reason, DLMsg,
rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index ab8c62fe..447cd893 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -87,7 +87,7 @@ publish1(RoutingKey, Format, Data, LogExch) ->
%% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
%% second resolution, not millisecond.
Timestamp = rabbit_misc:now_ms() div 1000,
- {ok, _RoutingRes, _DeliveredQPids} =
+ {ok, _DeliveredQPids} =
rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
timestamp = Timestamp},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 731351f0..f267467e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2408,8 +2408,8 @@ publish_and_confirm(Q, Payload, Count) ->
<<>>, #'P_basic'{delivery_mode = 2},
Payload),
Delivery = #delivery{mandatory = false, sender = self(),
- message = Msg, msg_seq_no = Seq},
- {routed, _} = rabbit_amqqueue:deliver([Q], Delivery)
+ confirm = true, message = Msg, msg_seq_no = Seq},
+ _QPids = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index d0dcaa71..b08a9a1c 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -88,9 +88,9 @@ trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
ok;
trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}},
RKPrefix, RKSuffix, Extra) ->
- {ok, _, _} = rabbit_basic:publish(
- X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
- #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
+ {ok, _} = rabbit_basic:publish(
+ X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
ok.
msg_to_table(#basic_message{exchange_name = #resource{name = XName},