summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-11-02 10:39:58 +0000
committermergify-bot <noreply@mergify.io>2021-11-08 13:05:49 +0000
commit066785a246732607fca20309f2b7792532c3f64d (patch)
tree2496561950c071f3b9312263394200073d4e4277
parentca46d458ebcd6d7dbccff7e44c41758c7d46d4d3 (diff)
downloadrabbitmq-server-git-066785a246732607fca20309f2b7792532c3f64d.tar.gz
QQ: try all servers when registering enqueuer
This should avoid channel crashes when the leader has recently changed and the amqqueue record hasn't yet been updated. Also catch errors in the QQ tick temporary process that updates metrics. During shutdown the metrics ETS table may disappear before this process has finished which would result in a noisy error in the logs. (cherry picked from commit 3f4a7caf76311aea1dee764bdc5eaf40e14fa87a)
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl26
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl10
2 files changed, 25 insertions, 11 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index 7ac1d7ba6e..1544e2be06 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -142,9 +142,10 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
enqueue(Correlation, Msg,
#state{queue_status = undefined,
next_enqueue_seq = 1,
- cfg = #cfg{timeout = Timeout}} = State0) ->
+ cfg = #cfg{servers = Servers,
+ timeout = Timeout}} = State0) ->
%% it is the first enqueue, check the version
- {_, Node} = Server = pick_server(State0),
+ {_, Node} = pick_server(State0),
case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
0 ->
%% the leader is running the old version
@@ -155,14 +156,21 @@ enqueue(Correlation, Msg,
%% were running the new version on the leader do sync initialisation
%% of enqueuer session
Reg = rabbit_fifo:make_register_enqueuer(self()),
- case ra:process_command(Server, Reg, Timeout) of
- {ok, reject_publish, _} ->
- {reject_publish, State0#state{queue_status = reject_publish}};
- {ok, ok, _} ->
- enqueue(Correlation, Msg, State0#state{queue_status = go});
+ case ra:process_command(Servers, Reg, Timeout) of
+ {ok, reject_publish, Leader} ->
+ {reject_publish, State0#state{leader = Leader,
+ queue_status = reject_publish}};
+ {ok, ok, Leader} ->
+ enqueue(Correlation, Msg, State0#state{leader = Leader,
+ queue_status = go});
+ {error, {no_more_servers_to_try, _Errs}} ->
+ %% if we are not able to process the register command
+ %% it is safe to reject the message as we never attempted
+ %% to send it
+ {reject_publish, State0};
+ %% TODO: not convinced this can ever happen when using
+ %% a list of servers
{timeout, _} ->
- %% if we timeout it is probably better to reject
- %% the message than being uncertain
{reject_publish, State0};
Err ->
exit(Err)
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 0131d1538b..d63757a59f 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -415,7 +415,9 @@ handle_tick(QName,
%% this makes calls to remote processes so cannot be run inside the
%% ra server
Self = self(),
- _ = spawn(fun() ->
+ _ = spawn(
+ fun() ->
+ try
R = reductions(Name),
rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
Util = case C of
@@ -454,7 +456,11 @@ handle_tick(QName,
ok
end
- end),
+ catch
+ _:_ ->
+ ok
+ end
+ end),
ok.
repair_leader_record(QName, Self) ->