diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-10-06 14:16:41 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-10-06 14:17:24 +0100 |
| commit | dee6c8a1989f7cb42dd0123adb4588777262b175 (patch) | |
| tree | 22ec44e160c5d334ad7083112d6e51e8c42cc3b8 | |
| parent | 1eb1ff1bf649c9fd94ea2b946fdbd75e8f5cbe07 (diff) | |
| download | rabbitmq-server-git-dee6c8a1989f7cb42dd0123adb4588777262b175.tar.gz | |
Fix credit flow related HA queue regression
When a down mirror would not clear credit flow.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_classic_queue.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_confirms.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_queue_type.erl | 11 |
5 files changed, 32 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 60632f5739..9e66445443 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1511,6 +1511,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates0) -> basic_consume(Q, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) -> + QName = amqqueue:get_name(Q), %% first phase argument validation %% each queue type may do further validations diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b788180b0f..fd4ecf6974 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -783,6 +783,7 @@ handle_info(emit_stats, State) -> handle_info({'DOWN', _MRef, process, QPid, Reason}, #ch{queue_states = QStates0, queue_monitors = _QMons} = State0) -> + credit_flow:peer_down(QPid), case rabbit_queue_type:handle_down(QPid, Reason, QStates0) of {ok, QState1, Actions} -> State1 = State0#ch{queue_states = QState1}, @@ -1792,7 +1793,8 @@ handle_consuming_queue_down_or_eol(QName, case catch basic_consume( QName, NoAck, ConsumerPrefetch, CTag, Exclusive, Args, true, StateN) of - {ok, StateN1} -> StateN1; + {ok, StateN1} -> + StateN1; _Err -> cancel_consumer(CTag, QName, StateN) end diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl index c9d9b951c9..351b4961b9 100644 --- a/src/rabbit_classic_queue.erl +++ b/src/rabbit_classic_queue.erl @@ -201,7 +201,8 @@ cancel(Q, ConsumerTag, OkMsg, ActingUser, State) -> [non_neg_integer()], state()) -> {state(), rabbit_queue_type:actions()}. settle(complete, _CTag, MsgIds, State) -> - delegate:invoke_no_result(State#?STATE.pid, + Pid = State#?STATE.pid, + delegate:invoke_no_result(Pid, {gen_server2, cast, [{ack, MsgIds, self()}]}), {State, []}; settle(Op, _CTag, MsgIds, State) -> @@ -257,10 +258,11 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef, maps:filter(fun (_, #msg_status{pending = Pids}) -> lists:member(Pid, Pids) end, U0)), - {Unconfirmed, ConfirmedSeqNos, Rejected} = + {Unconfirmed, Settled, Rejected} = settle_seq_nos(MsgSeqNos, Pid, U0, down), - Actions = [{settled, QRef, ConfirmedSeqNos}, - {rejected, QRef, Rejected} | Actions0], + Actions = settlement_action( + settled, QRef, Settled, + settlement_action(rejected, QRef, Rejected, Actions0)), {ok, State0#?STATE{unconfirmed = Unconfirmed}, Actions}; true -> %% any abnormal exit should be considered a full reject of the @@ -280,6 +282,11 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef, [{rejected, QRef, MsgIds} | Actions0]} end. +settlement_action(_Type, _QRef, [], Acc) -> + Acc; +settlement_action(Type, QRef, MsgSeqs, Acc) -> + [{Type, QRef, MsgSeqs} | Acc]. + -spec deliver([{amqqueue:amqqueue(), state()}], Delivery :: term()) -> {[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}. @@ -356,19 +363,23 @@ qpids(Qs, MsgNo) -> QRef = amqqueue:get_name(Q), Actions = [{monitor, QPid, QRef} | [{monitor, P, QRef} || P <- SPids]] ++ Actions0, - %% confirm record + %% confirm record only if MsgNo isn't undefined S = case S0 of #?STATE{unconfirmed = U0} -> Rec = [QPid | SPids], - U = U0#{MsgNo => #msg_status{pending = Rec}}, - S0#?STATE{unconfirmed = U}; + U = case MsgNo of + undefined -> + U0; + _ -> + U0#{MsgNo => #msg_status{pending = Rec}} + end, + S0#?STATE{pid = QPid, + unconfirmed = U}; stateless -> S0 end, - {[QPid | MPidAcc], - SPidAcc ++ SPids, - [{Q, S} | Qs0], - Actions} + {[QPid | MPidAcc], SPidAcc ++ SPids, + [{Q, S} | Qs0], Actions} end, {[], [], [], []}, Qs). %% internal-ish diff --git a/src/rabbit_confirms.erl b/src/rabbit_confirms.erl index 5aa3abdf12..2fe032d1f1 100644 --- a/src/rabbit_confirms.erl +++ b/src/rabbit_confirms.erl @@ -63,6 +63,7 @@ confirm(SeqNos, QName, #?MODULE{smallest = Smallest0, confirm_one(SeqNo, QName, Acc) end, {[], U0}, SeqNos), %% check if smallest is in Confirmed + %% TODO: this can be optimised by checking in the preceeding foldr Smallest = case lists:any(fun ({S, _}) -> S == Smallest0 end, Confirmed) of true -> diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index 727ee43faf..40028c8241 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -367,20 +367,19 @@ recover(VHost, Qs) -> -spec handle_down(pid(), term(), state()) -> {ok, state(), actions()} | {eol, queue_ref()} | {error, term()}. -handle_down(Pid, Info, #?STATE{monitor_registry = Reg} = State0) -> +handle_down(Pid, Info, #?STATE{monitor_registry = Reg0} = State0) -> %% lookup queue ref in monitor registry - case Reg of - #{Pid := QRef} -> - %% TODO: remove Pid from monitor_registry + case maps:take(Pid, Reg0) of + {QRef, Reg} -> case handle_event(QRef, {down, Pid, Info}, State0) of {ok, State, Actions} -> - {ok, State, Actions}; + {ok, State#?STATE{monitor_registry = Reg}, Actions}; eol -> {eol, QRef}; Err -> Err end; - _ -> + error -> {ok, State0, []} end. |
