diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-01-24 16:57:59 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-01-24 17:02:22 +0100 |
commit | fe47dd79f5d0bb55d13819f2ca5f6fa8cd74a115 (patch) | |
tree | fd284c29d20763e04bca83035930998150db2bb7 | |
parent | a9a61a6e37073899d721cde40dc4dd1229c4a43a (diff) | |
download | rabbitmq-server-git-fe47dd79f5d0bb55d13819f2ca5f6fa8cd74a115.tar.gz |
Delete stream2 queue and some fixes to recover
-rw-r--r-- | src/rabbit_stream2_queue.erl | 89 |
1 files changed, 33 insertions, 56 deletions
diff --git a/src/rabbit_stream2_queue.erl b/src/rabbit_stream2_queue.erl index 2245b0ed48..d764db42bc 100644 --- a/src/rabbit_stream2_queue.erl +++ b/src/rabbit_stream2_queue.erl @@ -236,12 +236,12 @@ declare(Q0) -> Replicas = rabbit_mnesia:cluster_nodes(all) -- [node()], N = ra_lib:derive_safe_string(atom_to_list(Name), 8), % rabbit_log:info("Declare stream2 in ~s", [Dir]), - Conf = #{name => N, - reference => QName}, + Conf = #{dir => Dir, + reference => QName, + name => list_to_atom(N)}, {ok, LeaderPid, ReplicaPids} = osiris:start_cluster(N, Replicas, Conf), - Q1 = amqqueue:set_slave_pids( - amqqueue:set_pid(Q0, LeaderPid), ReplicaPids), - NewQ1 = amqqueue:set_type_state(Q1, Conf), + Q1 = amqqueue:set_pid(Q0, LeaderPid), + NewQ1 = amqqueue:set_type_state(Q1, maps:put(replicas, ReplicaPids, Conf)), case rabbit_amqqueue:internal_declare(NewQ1, false) of {created, Q} -> rabbit_event:notify(queue_created, @@ -271,9 +271,12 @@ recover(Q0) -> case node(Pid) of Node -> Replicas = rabbit_mnesia:cluster_nodes(all) -- [node()], - {ok, LeaderPid} = osiris:restart_server(Name, Replicas, Conf), - Q = amqqueue:set_pid(Q0, LeaderPid), - {ok, _} = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q), + NewConf = maps:put(replicas, [], Conf), + Q1 = amqqueue:set_type_state(Q0, NewConf), + {ok, _} = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q1), + {ok, LeaderPid} = osiris:restart_server(Name, Replicas, NewConf), + Q2 = amqqueue:set_pid(Q1, LeaderPid), + {ok, _} = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q2), Slaves = restart_replicas_on_nodes(Name, LeaderPid, Replicas, Conf), update_slaves(QName, Slaves); _ -> @@ -309,9 +312,9 @@ restart_replicas_on_nodes(Name, LeaderPid, Replicas, Conf) -> {ok, Pid} -> [Pid | Pids]; {error, already_present} -> - ok; + Pids; {error, {already_started, _}} -> - ok; + Pids; Error -> rabbit_log:warning("Error starting stream ~p replica on node ~p: ~p", [Name, Replica, Error]), @@ -324,56 +327,33 @@ restart_replicas_on_nodes(Name, LeaderPid, Replicas, Conf) -> end end, [], Replicas). +update_slaves(_QName, []) -> + ok; update_slaves(QName, Slaves0) -> Fun = fun (Q) -> - Slaves = amqqueue:get_slave_pids(Q) ++ Slaves0, - amqqueue:set_slave_pids(Q, Slaves) + Conf = amqqueue:get_type_state(Q), + Slaves = filter_alive(maps:get(replicas, Conf)) ++ Slaves0, + amqqueue:set_type_state(Q, maps:put(replicas, Slaves, Conf)) end, rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), ok. +filter_alive(Pids) -> + lists:filter(fun(Pid) when node(Pid) == node() -> + rabbit_misc:is_process_alive(Pid); + (_) -> + true + end, Pids). + delete(Q, ActingUser) when ?amqqueue_is_stream2(Q) -> - {Name, _} = amqqueue:get_pid(Q), - QName = amqqueue:get_name(Q), - QNodes = get_nodes(Q), - Timeout = ?DELETE_TIMEOUT, - {ok, ReadyMsgs, _} = stat(Q), - Servers = [{Name, Node} || Node <- QNodes], - case ra:delete_cluster(Servers, Timeout) of - {ok, {_, LeaderNode} = Leader} -> - MRef = erlang:monitor(process, Leader), - receive - {'DOWN', MRef, process, _, _} -> - ok - after Timeout -> - ok = rabbit_ra_queue:force_delete(Servers) - end, - ok = delete_queue_data(QName, ActingUser), - rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], - 1000), - {ok, ReadyMsgs}; - {error, {no_more_servers_to_try, Errs}} -> - case lists:all(fun({{error, noproc}, _}) -> true; - (_) -> false - end, Errs) of - true -> - %% If all ra nodes were already down, the delete - %% has succeed - delete_queue_data(QName, ActingUser), - {ok, ReadyMsgs}; - false -> - %% attempt forced deletion of all servers - rabbit_log:warning( - "Could not delete quorum queue '~s', not enough nodes " - " online to reach a quorum: ~255p." - " Attempting force delete.", - [rabbit_misc:rs(QName), Errs]), - ok = rabbit_ra_queue:force_delete(Servers), - delete_queue_data(QName, ActingUser), - {ok, ReadyMsgs} - end - end. + Conf = amqqueue:get_type_state(Q), + Name = maps:get(name, Conf), + osiris:delete_cluster(Name, amqqueue:get_pid(Q), maps:get(replicas, Conf)), + delete_queue_data(amqqueue:get_name(Q), ActingUser), + %% TODO return number of ready messages + {ok, 0}. + qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) -> erlang:binary_to_atom(<<"%2F_", Name/binary>>, utf8); qname_to_rname(#resource{virtual_host = VHost, name = Name}) -> @@ -532,7 +512,7 @@ i(_K, _Q) -> ''. get_nodes(Q) when ?is_amqqueue(Q) -> [node(amqqueue:get_pid(Q)) | - [node(P) || P <- amqqueue:get_slave_pids(Q)]]. + [node(P) || P <- maps:get(replicas, amqqueue:get_type_state(Q))]]. open_files(_Name) -> %% TODO: this is a lie @@ -550,6 +530,3 @@ cluster_state(Name) -> delete_queue_data(QName, ActingUser) -> _ = rabbit_amqqueue:internal_delete(QName, ActingUser), ok. - -stat(_Q) -> - {ok, 0, 0}. |