diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-02 10:39:58 +0000 |
---|---|---|
committer | mergify-bot <noreply@mergify.io> | 2021-11-08 13:05:49 +0000 |
commit | 066785a246732607fca20309f2b7792532c3f64d (patch) | |
tree | 2496561950c071f3b9312263394200073d4e4277 | |
parent | ca46d458ebcd6d7dbccff7e44c41758c7d46d4d3 (diff) | |
download | rabbitmq-server-git-066785a246732607fca20309f2b7792532c3f64d.tar.gz |
QQ: try all servers when registering enqueuer
This should avoid channel crashes when the leader has recently changed
and the amqqueue record hasn't yet been updated.
Also catch errors in the QQ tick temporary process that updates metrics.
During shutdown the metrics ETS table may disappear before this process has finished
which would result in a noisy error in the logs.
(cherry picked from commit 3f4a7caf76311aea1dee764bdc5eaf40e14fa87a)
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 26 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 10 |
2 files changed, 25 insertions, 11 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 7ac1d7ba6e..1544e2be06 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -142,9 +142,10 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> enqueue(Correlation, Msg, #state{queue_status = undefined, next_enqueue_seq = 1, - cfg = #cfg{timeout = Timeout}} = State0) -> + cfg = #cfg{servers = Servers, + timeout = Timeout}} = State0) -> %% it is the first enqueue, check the version - {_, Node} = Server = pick_server(State0), + {_, Node} = pick_server(State0), case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of 0 -> %% the leader is running the old version @@ -155,14 +156,21 @@ enqueue(Correlation, Msg, %% were running the new version on the leader do sync initialisation %% of enqueuer session Reg = rabbit_fifo:make_register_enqueuer(self()), - case ra:process_command(Server, Reg, Timeout) of - {ok, reject_publish, _} -> - {reject_publish, State0#state{queue_status = reject_publish}}; - {ok, ok, _} -> - enqueue(Correlation, Msg, State0#state{queue_status = go}); + case ra:process_command(Servers, Reg, Timeout) of + {ok, reject_publish, Leader} -> + {reject_publish, State0#state{leader = Leader, + queue_status = reject_publish}}; + {ok, ok, Leader} -> + enqueue(Correlation, Msg, State0#state{leader = Leader, + queue_status = go}); + {error, {no_more_servers_to_try, _Errs}} -> + %% if we are not able to process the register command + %% it is safe to reject the message as we never attempted + %% to send it + {reject_publish, State0}; + %% TODO: not convinced this can ever happen when using + %% a list of servers {timeout, _} -> - %% if we timeout it is probably better to reject - %% the message than being uncertain {reject_publish, State0}; Err -> exit(Err) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 0131d1538b..d63757a59f 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -415,7 +415,9 @@ handle_tick(QName, %% this makes calls to remote processes so cannot be run inside the %% ra server Self = self(), - _ = spawn(fun() -> + _ = spawn( + fun() -> + try R = reductions(Name), rabbit_core_metrics:queue_stats(QName, MR, MU, M, R), Util = case C of @@ -454,7 +456,11 @@ handle_tick(QName, ok end - end), + catch + _:_ -> + ok + end + end), ok. repair_leader_record(QName, Self) -> |