summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-12 17:08:16 +0300
committerGitHub <noreply@github.com>2021-11-12 17:08:16 +0300
commit9fc298dffab31e8489d28a5723877fc4a2df063f (patch)
treec24a9d2102917b2585e204f957ac5eb4258d54b4
parent267558b9eb6b58cc9ca24923996b0cc5fbe5d0c5 (diff)
parentbf6c7476070958f986cf8b892c3e1abb295d2c12 (diff)
downloadrabbitmq-server-git-9fc298dffab31e8489d28a5723877fc4a2df063f.tar.gz
Merge pull request #3656 from rabbitmq/qq-messages-pending-fixes
Expose pending enqueues in quorum queue overview
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl8
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl2
2 files changed, 8 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index 71172fce70..e7efb21f61 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -742,6 +742,7 @@ overview(#?MODULE{consumers = Cons,
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
num_ready_messages => messages_ready(State),
+ num_pending_messages => messages_pending(State),
num_messages => messages_total(State),
num_release_cursors => lqueue:len(Cursors),
release_cursors => [I || {_, I, _} <- lqueue:to_list(Cursors)],
@@ -1011,6 +1012,11 @@ usage(Name) when is_atom(Name) ->
%%% Internal
+messages_pending(#?MODULE{enqueuers = Enqs}) ->
+ maps:fold(fun(_, #enqueuer{pending = P}, Acc) ->
+ length(P) + Acc
+ end, 0, Enqs).
+
messages_ready(#?MODULE{messages = M,
prefix_msgs = {RCnt, _R, PCnt, _P},
returns = R}) ->
@@ -1301,7 +1307,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
#enqueuer{next_seqno = Next,
pending = Pending0} = Enq0
when MsgSeqNo > Next ->
- % out of order delivery
+ % out of order enqueue
Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0],
Enq = Enq0#enqueuer{pending = lists:sort(Pending)},
{ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index f9beb7928b..9b8920f89e 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -611,7 +611,7 @@ handle_ra_event(Leader, {machine, leader_change}, State0) ->
%% we need to update leader
%% and resend any pending commands
State = resend_all_pending(State0#state{leader = Leader}),
- {ok, State, []};
+ {ok, cancel_timer(State), []};
handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{ok, State0, []};