diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-01 12:39:24 +0000 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-12 13:37:15 +0000 |
commit | bf6c7476070958f986cf8b892c3e1abb295d2c12 (patch) | |
tree | 0d659356670c9b4819118e829b5b67e8b8b5bd7e | |
parent | 58767cd4fb68240a27c5ebc04984a964611704db (diff) | |
download | rabbitmq-server-git-bf6c7476070958f986cf8b892c3e1abb295d2c12.tar.gz |
Expose pending enqueues in overview output to allow
debugging of situation where messages may be stuck.
Also cancel rabbit_fifo_client timer after message resend to avoid
resending them again when the timer triggers.
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 8 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 2 |
2 files changed, 8 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 71172fce70..e7efb21f61 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -742,6 +742,7 @@ overview(#?MODULE{consumers = Cons, num_checked_out => num_checked_out(State), num_enqueuers => maps:size(Enqs), num_ready_messages => messages_ready(State), + num_pending_messages => messages_pending(State), num_messages => messages_total(State), num_release_cursors => lqueue:len(Cursors), release_cursors => [I || {_, I, _} <- lqueue:to_list(Cursors)], @@ -1011,6 +1012,11 @@ usage(Name) when is_atom(Name) -> %%% Internal +messages_pending(#?MODULE{enqueuers = Enqs}) -> + maps:fold(fun(_, #enqueuer{pending = P}, Acc) -> + length(P) + Acc + end, 0, Enqs). + messages_ready(#?MODULE{messages = M, prefix_msgs = {RCnt, _R, PCnt, _P}, returns = R}) -> @@ -1301,7 +1307,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, #enqueuer{next_seqno = Next, pending = Pending0} = Enq0 when MsgSeqNo > Next -> - % out of order delivery + % out of order enqueue Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0], Enq = Enq0#enqueuer{pending = lists:sort(Pending)}, {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 7ac1d7ba6e..fd58030bd2 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -589,7 +589,7 @@ handle_ra_event(Leader, {machine, leader_change}, State0) -> %% we need to update leader %% and resend any pending commands State = resend_all_pending(State0#state{leader = Leader}), - {ok, State, []}; + {ok, cancel_timer(State), []}; handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {ok, State0, []}; |