summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-01-24 16:57:59 +0100
committerdcorbacho <dparracorbacho@piotal.io>2020-01-24 17:02:22 +0100
commitfe47dd79f5d0bb55d13819f2ca5f6fa8cd74a115 (patch)
treefd284c29d20763e04bca83035930998150db2bb7
parenta9a61a6e37073899d721cde40dc4dd1229c4a43a (diff)
downloadrabbitmq-server-git-fe47dd79f5d0bb55d13819f2ca5f6fa8cd74a115.tar.gz
Delete stream2 queue and some fixes to recover
-rw-r--r--src/rabbit_stream2_queue.erl89
1 files changed, 33 insertions, 56 deletions
diff --git a/src/rabbit_stream2_queue.erl b/src/rabbit_stream2_queue.erl
index 2245b0ed48..d764db42bc 100644
--- a/src/rabbit_stream2_queue.erl
+++ b/src/rabbit_stream2_queue.erl
@@ -236,12 +236,12 @@ declare(Q0) ->
Replicas = rabbit_mnesia:cluster_nodes(all) -- [node()],
N = ra_lib:derive_safe_string(atom_to_list(Name), 8),
% rabbit_log:info("Declare stream2 in ~s", [Dir]),
- Conf = #{name => N,
- reference => QName},
+ Conf = #{dir => Dir,
+ reference => QName,
+ name => list_to_atom(N)},
{ok, LeaderPid, ReplicaPids} = osiris:start_cluster(N, Replicas, Conf),
- Q1 = amqqueue:set_slave_pids(
- amqqueue:set_pid(Q0, LeaderPid), ReplicaPids),
- NewQ1 = amqqueue:set_type_state(Q1, Conf),
+ Q1 = amqqueue:set_pid(Q0, LeaderPid),
+ NewQ1 = amqqueue:set_type_state(Q1, maps:put(replicas, ReplicaPids, Conf)),
case rabbit_amqqueue:internal_declare(NewQ1, false) of
{created, Q} ->
rabbit_event:notify(queue_created,
@@ -271,9 +271,12 @@ recover(Q0) ->
case node(Pid) of
Node ->
Replicas = rabbit_mnesia:cluster_nodes(all) -- [node()],
- {ok, LeaderPid} = osiris:restart_server(Name, Replicas, Conf),
- Q = amqqueue:set_pid(Q0, LeaderPid),
- {ok, _} = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q),
+ NewConf = maps:put(replicas, [], Conf),
+ Q1 = amqqueue:set_type_state(Q0, NewConf),
+ {ok, _} = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q1),
+ {ok, LeaderPid} = osiris:restart_server(Name, Replicas, NewConf),
+ Q2 = amqqueue:set_pid(Q1, LeaderPid),
+ {ok, _} = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q2),
Slaves = restart_replicas_on_nodes(Name, LeaderPid, Replicas, Conf),
update_slaves(QName, Slaves);
_ ->
@@ -309,9 +312,9 @@ restart_replicas_on_nodes(Name, LeaderPid, Replicas, Conf) ->
{ok, Pid} ->
[Pid | Pids];
{error, already_present} ->
- ok;
+ Pids;
{error, {already_started, _}} ->
- ok;
+ Pids;
Error ->
rabbit_log:warning("Error starting stream ~p replica on node ~p: ~p",
[Name, Replica, Error]),
@@ -324,56 +327,33 @@ restart_replicas_on_nodes(Name, LeaderPid, Replicas, Conf) ->
end
end, [], Replicas).
+update_slaves(_QName, []) ->
+ ok;
update_slaves(QName, Slaves0) ->
Fun = fun (Q) ->
- Slaves = amqqueue:get_slave_pids(Q) ++ Slaves0,
- amqqueue:set_slave_pids(Q, Slaves)
+ Conf = amqqueue:get_type_state(Q),
+ Slaves = filter_alive(maps:get(replicas, Conf)) ++ Slaves0,
+ amqqueue:set_type_state(Q, maps:put(replicas, Slaves, Conf))
end,
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
ok.
+filter_alive(Pids) ->
+ lists:filter(fun(Pid) when node(Pid) == node() ->
+ rabbit_misc:is_process_alive(Pid);
+ (_) ->
+ true
+ end, Pids).
+
delete(Q, ActingUser) when ?amqqueue_is_stream2(Q) ->
- {Name, _} = amqqueue:get_pid(Q),
- QName = amqqueue:get_name(Q),
- QNodes = get_nodes(Q),
- Timeout = ?DELETE_TIMEOUT,
- {ok, ReadyMsgs, _} = stat(Q),
- Servers = [{Name, Node} || Node <- QNodes],
- case ra:delete_cluster(Servers, Timeout) of
- {ok, {_, LeaderNode} = Leader} ->
- MRef = erlang:monitor(process, Leader),
- receive
- {'DOWN', MRef, process, _, _} ->
- ok
- after Timeout ->
- ok = rabbit_ra_queue:force_delete(Servers)
- end,
- ok = delete_queue_data(QName, ActingUser),
- rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
- 1000),
- {ok, ReadyMsgs};
- {error, {no_more_servers_to_try, Errs}} ->
- case lists:all(fun({{error, noproc}, _}) -> true;
- (_) -> false
- end, Errs) of
- true ->
- %% If all ra nodes were already down, the delete
- %% has succeed
- delete_queue_data(QName, ActingUser),
- {ok, ReadyMsgs};
- false ->
- %% attempt forced deletion of all servers
- rabbit_log:warning(
- "Could not delete quorum queue '~s', not enough nodes "
- " online to reach a quorum: ~255p."
- " Attempting force delete.",
- [rabbit_misc:rs(QName), Errs]),
- ok = rabbit_ra_queue:force_delete(Servers),
- delete_queue_data(QName, ActingUser),
- {ok, ReadyMsgs}
- end
- end.
+ Conf = amqqueue:get_type_state(Q),
+ Name = maps:get(name, Conf),
+ osiris:delete_cluster(Name, amqqueue:get_pid(Q), maps:get(replicas, Conf)),
+ delete_queue_data(amqqueue:get_name(Q), ActingUser),
+ %% TODO return number of ready messages
+ {ok, 0}.
+
qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) ->
erlang:binary_to_atom(<<"%2F_", Name/binary>>, utf8);
qname_to_rname(#resource{virtual_host = VHost, name = Name}) ->
@@ -532,7 +512,7 @@ i(_K, _Q) -> ''.
get_nodes(Q) when ?is_amqqueue(Q) ->
[node(amqqueue:get_pid(Q)) |
- [node(P) || P <- amqqueue:get_slave_pids(Q)]].
+ [node(P) || P <- maps:get(replicas, amqqueue:get_type_state(Q))]].
open_files(_Name) ->
%% TODO: this is a lie
@@ -550,6 +530,3 @@ cluster_state(Name) ->
delete_queue_data(QName, ActingUser) ->
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
ok.
-
-stat(_Q) ->
- {ok, 0, 0}.