diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-21 17:49:44 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-21 17:49:44 +0000 |
commit | 860e1265688f1d56150b6a980e3460829ae4751e (patch) | |
tree | 91a8fb5be0f5acfb99daa7f603e3eab056eefc28 | |
parent | 609d80ce2799a783ecf659741b0da538d56449e1 (diff) | |
download | rabbitmq-server-860e1265688f1d56150b6a980e3460829ae4751e.tar.gz |
Get dtree use right: we want take_all/2 when we get a mandatory ack (since one queue getting it is enough). take/2 on DOWN is correct.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 12 |
2 files changed, 6 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 44754788..df9748fb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -446,7 +446,7 @@ send_mandatory(#delivery{mandatory = false}) -> send_mandatory(#delivery{mandatory = true, sender = SenderPid, msg_seq_no = MsgSeqNo}) -> - gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo, self()}). + gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). discard(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dba826fc..b862766a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -349,8 +349,8 @@ handle_cast(force_event_refresh, State) -> noreply(State); %% TODO duplication? -handle_cast({mandatory_received, MsgSeqNo, From}, State) -> - State1 = #ch{mandatory = M} = handle_mandatory(MsgSeqNo, From, State), +handle_cast({mandatory_received, MsgSeqNo}, State) -> + State1 = #ch{mandatory = M} = handle_mandatory(MsgSeqNo, State), Timeout = case M of [] -> hibernate; _ -> 0 end, %% NB: don't call noreply/1 since we don't want to send confirms. {noreply, ensure_stats_timer(State1), Timeout}; @@ -632,9 +632,9 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), record_confirms(MXs, State#ch{unconfirmed = UC1}). -handle_mandatory(MsgSeqNo, QPid, State = #ch{mandatory = UC}) -> - {_MXs, UC1} = dtree:take([MsgSeqNo], QPid, UC), - State#ch{mandatory = UC1}. +handle_mandatory(MsgSeqNo, State = #ch{mandatory = Mand}) -> + {_MMsgs, Mand1} = dtree:take_all([MsgSeqNo], Mand), + State#ch{mandatory = Mand1}. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? @@ -1266,9 +1266,7 @@ monitor_delivering_queue(NoAck, QPid, QName, handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, mandatory = Mand}) -> - %% TODO do we need take_all here? {MMsgs, Mand1} = dtree:take(QPid, Mand), - io:format("returning ~p~n", [MMsgs]), [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs], State1 = State#ch{mandatory = Mand1}, case rabbit_misc:is_abnormal_exit(Reason) of |