summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-10-06 14:16:41 +0100
committerkjnilsson <knilsson@pivotal.io>2020-10-06 14:17:24 +0100
commitdee6c8a1989f7cb42dd0123adb4588777262b175 (patch)
tree22ec44e160c5d334ad7083112d6e51e8c42cc3b8
parent1eb1ff1bf649c9fd94ea2b946fdbd75e8f5cbe07 (diff)
downloadrabbitmq-server-git-dee6c8a1989f7cb42dd0123adb4588777262b175.tar.gz
Fix credit flow related HA queue regression
When a down mirror would not clear credit flow.
-rw-r--r--src/rabbit_amqqueue.erl1
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_classic_queue.erl33
-rw-r--r--src/rabbit_confirms.erl1
-rw-r--r--src/rabbit_queue_type.erl11
5 files changed, 32 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 60632f5739..9e66445443 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1511,6 +1511,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
basic_consume(Q, NoAck, ChPid, LimiterPid,
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) ->
+
QName = amqqueue:get_name(Q),
%% first phase argument validation
%% each queue type may do further validations
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b788180b0f..fd4ecf6974 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -783,6 +783,7 @@ handle_info(emit_stats, State) ->
handle_info({'DOWN', _MRef, process, QPid, Reason},
#ch{queue_states = QStates0,
queue_monitors = _QMons} = State0) ->
+ credit_flow:peer_down(QPid),
case rabbit_queue_type:handle_down(QPid, Reason, QStates0) of
{ok, QState1, Actions} ->
State1 = State0#ch{queue_states = QState1},
@@ -1792,7 +1793,8 @@ handle_consuming_queue_down_or_eol(QName,
case catch basic_consume(
QName, NoAck, ConsumerPrefetch, CTag,
Exclusive, Args, true, StateN) of
- {ok, StateN1} -> StateN1;
+ {ok, StateN1} ->
+ StateN1;
_Err ->
cancel_consumer(CTag, QName, StateN)
end
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
index c9d9b951c9..351b4961b9 100644
--- a/src/rabbit_classic_queue.erl
+++ b/src/rabbit_classic_queue.erl
@@ -201,7 +201,8 @@ cancel(Q, ConsumerTag, OkMsg, ActingUser, State) ->
[non_neg_integer()], state()) ->
{state(), rabbit_queue_type:actions()}.
settle(complete, _CTag, MsgIds, State) ->
- delegate:invoke_no_result(State#?STATE.pid,
+ Pid = State#?STATE.pid,
+ delegate:invoke_no_result(Pid,
{gen_server2, cast, [{ack, MsgIds, self()}]}),
{State, []};
settle(Op, _CTag, MsgIds, State) ->
@@ -257,10 +258,11 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef,
maps:filter(fun (_, #msg_status{pending = Pids}) ->
lists:member(Pid, Pids)
end, U0)),
- {Unconfirmed, ConfirmedSeqNos, Rejected} =
+ {Unconfirmed, Settled, Rejected} =
settle_seq_nos(MsgSeqNos, Pid, U0, down),
- Actions = [{settled, QRef, ConfirmedSeqNos},
- {rejected, QRef, Rejected} | Actions0],
+ Actions = settlement_action(
+ settled, QRef, Settled,
+ settlement_action(rejected, QRef, Rejected, Actions0)),
{ok, State0#?STATE{unconfirmed = Unconfirmed}, Actions};
true ->
%% any abnormal exit should be considered a full reject of the
@@ -280,6 +282,11 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef,
[{rejected, QRef, MsgIds} | Actions0]}
end.
+settlement_action(_Type, _QRef, [], Acc) ->
+ Acc;
+settlement_action(Type, QRef, MsgSeqs, Acc) ->
+ [{Type, QRef, MsgSeqs} | Acc].
+
-spec deliver([{amqqueue:amqqueue(), state()}],
Delivery :: term()) ->
{[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.
@@ -356,19 +363,23 @@ qpids(Qs, MsgNo) ->
QRef = amqqueue:get_name(Q),
Actions = [{monitor, QPid, QRef}
| [{monitor, P, QRef} || P <- SPids]] ++ Actions0,
- %% confirm record
+ %% confirm record only if MsgNo isn't undefined
S = case S0 of
#?STATE{unconfirmed = U0} ->
Rec = [QPid | SPids],
- U = U0#{MsgNo => #msg_status{pending = Rec}},
- S0#?STATE{unconfirmed = U};
+ U = case MsgNo of
+ undefined ->
+ U0;
+ _ ->
+ U0#{MsgNo => #msg_status{pending = Rec}}
+ end,
+ S0#?STATE{pid = QPid,
+ unconfirmed = U};
stateless ->
S0
end,
- {[QPid | MPidAcc],
- SPidAcc ++ SPids,
- [{Q, S} | Qs0],
- Actions}
+ {[QPid | MPidAcc], SPidAcc ++ SPids,
+ [{Q, S} | Qs0], Actions}
end, {[], [], [], []}, Qs).
%% internal-ish
diff --git a/src/rabbit_confirms.erl b/src/rabbit_confirms.erl
index 5aa3abdf12..2fe032d1f1 100644
--- a/src/rabbit_confirms.erl
+++ b/src/rabbit_confirms.erl
@@ -63,6 +63,7 @@ confirm(SeqNos, QName, #?MODULE{smallest = Smallest0,
confirm_one(SeqNo, QName, Acc)
end, {[], U0}, SeqNos),
%% check if smallest is in Confirmed
+ %% TODO: this can be optimised by checking in the preceeding foldr
Smallest =
case lists:any(fun ({S, _}) -> S == Smallest0 end, Confirmed) of
true ->
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index 727ee43faf..40028c8241 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -367,20 +367,19 @@ recover(VHost, Qs) ->
-spec handle_down(pid(), term(), state()) ->
{ok, state(), actions()} | {eol, queue_ref()} | {error, term()}.
-handle_down(Pid, Info, #?STATE{monitor_registry = Reg} = State0) ->
+handle_down(Pid, Info, #?STATE{monitor_registry = Reg0} = State0) ->
%% lookup queue ref in monitor registry
- case Reg of
- #{Pid := QRef} ->
- %% TODO: remove Pid from monitor_registry
+ case maps:take(Pid, Reg0) of
+ {QRef, Reg} ->
case handle_event(QRef, {down, Pid, Info}, State0) of
{ok, State, Actions} ->
- {ok, State, Actions};
+ {ok, State#?STATE{monitor_registry = Reg}, Actions};
eol ->
{eol, QRef};
Err ->
Err
end;
- _ ->
+ error ->
{ok, State0, []}
end.