summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-21 17:49:44 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-21 17:49:44 +0000
commit860e1265688f1d56150b6a980e3460829ae4751e (patch)
tree91a8fb5be0f5acfb99daa7f603e3eab056eefc28
parent609d80ce2799a783ecf659741b0da538d56449e1 (diff)
downloadrabbitmq-server-860e1265688f1d56150b6a980e3460829ae4751e.tar.gz
Get dtree use right: we want take_all/2 when we get a mandatory ack (since one queue getting it is enough). take/2 on DOWN is correct.
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl12
2 files changed, 6 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 44754788..df9748fb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -446,7 +446,7 @@ send_mandatory(#delivery{mandatory = false}) ->
send_mandatory(#delivery{mandatory = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo}) ->
- gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo, self()}).
+ gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
discard(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dba826fc..b862766a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -349,8 +349,8 @@ handle_cast(force_event_refresh, State) ->
noreply(State);
%% TODO duplication?
-handle_cast({mandatory_received, MsgSeqNo, From}, State) ->
- State1 = #ch{mandatory = M} = handle_mandatory(MsgSeqNo, From, State),
+handle_cast({mandatory_received, MsgSeqNo}, State) ->
+ State1 = #ch{mandatory = M} = handle_mandatory(MsgSeqNo, State),
Timeout = case M of [] -> hibernate; _ -> 0 end,
%% NB: don't call noreply/1 since we don't want to send confirms.
{noreply, ensure_stats_timer(State1), Timeout};
@@ -632,9 +632,9 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
record_confirms(MXs, State#ch{unconfirmed = UC1}).
-handle_mandatory(MsgSeqNo, QPid, State = #ch{mandatory = UC}) ->
- {_MXs, UC1} = dtree:take([MsgSeqNo], QPid, UC),
- State#ch{mandatory = UC1}.
+handle_mandatory(MsgSeqNo, State = #ch{mandatory = Mand}) ->
+ {_MMsgs, Mand1} = dtree:take_all([MsgSeqNo], Mand),
+ State#ch{mandatory = Mand1}.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
%% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
@@ -1266,9 +1266,7 @@ monitor_delivering_queue(NoAck, QPid, QName,
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
mandatory = Mand}) ->
- %% TODO do we need take_all here?
{MMsgs, Mand1} = dtree:take(QPid, Mand),
- io:format("returning ~p~n", [MMsgs]),
[basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs],
State1 = State#ch{mandatory = Mand1},
case rabbit_misc:is_abnormal_exit(Reason) of