diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-20 13:04:39 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-20 13:04:39 +0000 |
commit | 59e619dce920d03adf55907324f4cd4ec1095350 (patch) | |
tree | 8998ca29da7e9340c8e368567ffbbc7123959f49 | |
parent | c5bd7a3553f94c1eec1e8d5bfa62747cf7622d54 (diff) | |
parent | cbf31df3f7e86ecd91746df92a504d3c64b85b38 (diff) | |
download | rabbitmq-server-59e619dce920d03adf55907324f4cd4ec1095350.tar.gz |
merge default into bug23749
-rwxr-xr-x | scripts/rabbitmq-plugins.bat | 6 | ||||
-rw-r--r-- | src/rabbit.erl | 10 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 83 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 27 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 5 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 121 | ||||
-rw-r--r-- | src/rabbit_runtime_parameter.erl | 3 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters.erl | 22 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters_test.erl | 6 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 50 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 4 |
12 files changed, 204 insertions, 142 deletions
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index 713d7000..4b4dbe47 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -23,8 +23,12 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_SERVICENAME!"=="" (
+ set RABBITMQ_SERVICENAME=RabbitMQ
+)
+
if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+ set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/src/rabbit.erl b/src/rabbit.erl index c004c489..f3ba022a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -355,6 +355,8 @@ handle_app_error(App, Reason) -> throw({could_not_start, App, Reason}). start_it(StartFun) -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + register(rabbit_boot, Marker), try StartFun() catch @@ -363,11 +365,17 @@ start_it(StartFun) -> _:Reason -> boot_error(Reason, erlang:get_stacktrace()) after + unlink(Marker), + Marker ! stop, %% give the error loggers some time to catch up timer:sleep(100) end. stop() -> + case whereis(rabbit_boot) of + undefined -> ok; + _ -> await_startup() + end, rabbit_log:info("Stopping RabbitMQ~n"), ok = app_utils:stop_applications(app_shutdown_order()). @@ -703,7 +711,7 @@ log_broker_started(Plugins) -> PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P]) || P <- Plugins]), error_logger:info_msg( - "Server startup complete; ~b plugins started.~n~s~n", + "Server startup complete; ~b plugins started.~n~s", [length(Plugins), PluginList]), io:format(" completed with ~p plugins.~n", [length(Plugins)]) end). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index aed25344..aec0074a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -612,6 +612,15 @@ handle_method(_Method, _, State = #ch{state = closing}) -> handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> {ok, State1} = notify_queues(State), + %% We issue the channel.close_ok response after a handshake with + %% the reader, the other half of which is ready_for_close. That + %% way the reader forgets about the channel before we send the + %% response (and this channel process terminates). If we didn't do + %% that, a channel.open for the same channel number, which a + %% client is entitled to send as soon as it has received the + %% close_ok, might be received by the reader before it has seen + %% the termination and hence be sent to the old, now dead/dying + %% channel process, instead of a new process, and thus lost. ReaderPid ! {channel_closing, self()}, {noreply, State1}; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 05036d35..4fb1fc3b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -32,6 +32,8 @@ [policy_validator, <<"ha-mode">>, ?MODULE]}}, {mfa, {rabbit_registry, register, [policy_validator, <<"ha-params">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, {requires, rabbit_registry}, {enables, recovery}]}). @@ -184,6 +186,7 @@ start_child(Name, MirrorNode, Q) -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of {ok, SPid} when is_pid(SPid) -> + maybe_auto_sync(Q), rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), {ok, started}; @@ -235,13 +238,13 @@ suggested_queue_nodes(Q) -> %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. suggested_queue_nodes(Q, PossibleNodes) -> - {MNode0, SNodes} = actual_queue_nodes(Q), + {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, PossibleNodes). + {MNode, SNodes, SSNodes}, PossibleNodes). policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of @@ -249,15 +252,20 @@ policy(Policy, Q) -> _ -> none end. -suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) -> - {MNode, Possible -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) -> + {MNode, Poss -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) -> Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - %% If the current master is currently not in the nodes specified, - %% act like it is for the purposes below - otherwise we will not - %% return it in the results... - Nodes = lists:usort([MNode | Nodes1]), - Unavailable = Nodes -- Possible, + %% If the current master is not in the nodes specified, then what we want + %% to do depends on whether there are any synchronised slaves. If there + %% are then we can just kill the current master - the admin has asked for + %% a migration and we should give it to them. If there are not however + %% then we must keep the master around so as not to lose messages. + Nodes = case SSNodes of + [] -> lists:usort([MNode | Nodes1]); + _ -> Nodes1 + end, + Unavailable = Nodes -- Poss, Available = Nodes -- Unavailable, case Available of [] -> %% We have never heard of anything? Not much we can do but @@ -265,21 +273,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> {MNode, []}; _ -> case lists:member(MNode, Available) of true -> {MNode, Available -- [MNode]}; - false -> promote_slave(Available) + false -> %% Make sure the new master is synced! In order to + %% get here SSNodes must not be empty. + [NewMNode | _] = SSNodes, + {NewMNode, Available -- [NewMNode]} end end; %% When we need to add nodes, we randomise our candidate list as a %% crude form of load-balancing. TODO it would also be nice to -%% randomise the list of ones to remove when we have too many - but -%% that would fail to take account of synchronisation... -suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) -> +%% randomise the list of ones to remove when we have too many - we +%% would have to take account of synchronisation though. +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) -> SCount = Count - 1, {MNode, case SCount > length(SNodes) of - true -> Cand = shuffle((Possible -- [MNode]) -- SNodes), + true -> Cand = shuffle((Poss -- [MNode]) -- SNodes), SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); false -> lists:sublist(SNodes, SCount) end}; -suggested_queue_nodes(_, _, {MNode, _}, _) -> +suggested_queue_nodes(_, _, {MNode, _, _}, _) -> {MNode, []}. shuffle(L) -> @@ -288,11 +299,14 @@ shuffle(L) -> {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), L1. -actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> +actual_queue_nodes(#amqqueue{pid = MPid, + slave_pids = SPids, + sync_slave_pids = SSPids}) -> + Nodes = fun (L) -> [node(Pid) || Pid <- L] end, {case MPid of none -> none; _ -> node(MPid) - end, [node(Pid) || Pid <- SPids]}. + end, Nodes(SPids), Nodes(SSPids)}. is_mirrored(Q) -> case policy(<<"ha-mode">>, Q) of @@ -302,6 +316,14 @@ is_mirrored(Q) -> _ -> false end. +maybe_auto_sync(Q = #amqqueue{pid = QPid}) -> + case policy(<<"ha-sync-mode">>, Q) of + <<"automatic">> -> + spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end); + _ -> + ok + end. + update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of @@ -313,19 +335,30 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, update_mirrors0(OldQ = #amqqueue{name = QName}, NewQ = #amqqueue{name = QName}) -> - All = fun ({A,B}) -> [A|B] end, - OldNodes = All(actual_queue_nodes(OldQ)), - NewNodes = All(suggested_queue_nodes(NewQ)), - add_mirrors(QName, NewNodes -- OldNodes), + {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ), + {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), + OldNodes = [OldMNode | OldSNodes], + NewNodes = [NewMNode | NewSNodes], + add_mirrors (QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), + maybe_auto_sync(NewQ), ok. %%---------------------------------------------------------------------------- validate_policy(KeyList) -> - validate_policy( - proplists:get_value(<<"ha-mode">>, KeyList), - proplists:get_value(<<"ha-params">>, KeyList, none)). + case validate_policy( + proplists:get_value(<<"ha-mode">>, KeyList), + proplists:get_value(<<"ha-params">>, KeyList, none)) of + ok -> case proplists:get_value( + <<"ha-sync-mode">>, KeyList, <<"manual">>) of + <<"automatic">> -> ok; + <<"manual">> -> ok; + Mode -> {error, "ha-sync-mode must be \"manual\" " + "or \"automatic\", got ~p", [Mode]} + end; + E -> E + end. validate_policy(<<"all">>, none) -> ok; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 69a3be2b..b435e0f3 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -830,16 +830,21 @@ update_ram_duration(BQ, BQS) -> rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQ:set_ram_duration_target(DesiredDuration, BQS1). +%% [1] - the arrival of this newly synced slave may cause the master to die if +%% the admin has requested a migration-type change to policy. record_synchronised(#amqqueue { name = QName }) -> Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q = #amqqueue { sync_slave_pids = SSPids }] -> - rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { sync_slave_pids = [Self | SSPids] }), - ok - end - end). + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue { sync_slave_pids = SSPids }] -> + Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, + rabbit_mirror_queue_misc:store_updated_slaves(Q2), + {ok, Q1, Q2} + end + end) of + ok -> ok; + {ok, Q1, Q2} -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2) %% [1] + end. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index e712078b..7398cd2d 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -26,7 +26,7 @@ -export([register/0]). -export([name/1, get/2, set/1]). --export([validate/4, validate_clear/3, notify/4, notify_clear/3]). +-export([validate/4, notify/4, notify_clear/3]). -export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1, list_formatted/1, info_keys/0]). @@ -146,9 +146,6 @@ validate(_VHost, <<"policy">>, Name, Term) -> rabbit_parameter_validation:proplist( Name, policy_validation(), Term). -validate_clear(_VHost, <<"policy">>, _Name) -> - ok. - notify(VHost, <<"policy">>, _Name, _Term) -> update_policies(VHost). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index af7aac6f..b8ff9c9f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -295,26 +295,35 @@ recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> case rabbit_net:recv(Sock) of - {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); - closed -> case State#v1.connection_state of - closed -> State; - _ -> throw(connection_closed_abruptly) - end; - {error, Reason} -> throw({inet_error, Reason}); - {other, Other} -> handle_other(Other, Deb, State) + {data, Data} -> + recvloop(Deb, State#v1{buf = [Data | Buf], + buf_len = BufLen + size(Data), + pending_recv = false}); + closed when State#v1.connection_state =:= closed -> + ok; + closed -> + throw(connection_closed_abruptly); + {error, Reason} -> + throw({inet_error, Reason}); + {other, {system, From, Request}} -> + sys:handle_system_msg(Request, From, State#v1.parent, + ?MODULE, Deb, State); + {other, Other} -> + case handle_other(Other, State) of + stop -> ok; + NewState -> recvloop(Deb, NewState) + end end. -handle_other({conserve_resources, Conserve}, Deb, +handle_other({conserve_resources, Conserve}, State = #v1{throttle = Throttle}) -> Throttle1 = Throttle#throttle{conserve_resources = Conserve}, - recvloop(Deb, control_throttle(State#v1{throttle = Throttle1})); -handle_other({channel_closing, ChPid}, Deb, State) -> + control_throttle(State#v1{throttle = Throttle1}); +handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), - mainloop(Deb, maybe_close(control_throttle(State))); -handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> + maybe_close(control_throttle(State)); +handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to @@ -326,57 +335,54 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> %% initiated by our parent it is probably more important to exit %% quickly. exit(Reason); -handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}}, - _Deb, _State) -> +handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) -> throw(E); -handle_other({channel_exit, Channel, Reason}, Deb, State) -> - mainloop(Deb, handle_exception(State, Channel, Reason)); -handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) -> - mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); -handle_other(terminate_connection, _Deb, State) -> - State; -handle_other(handshake_timeout, Deb, State) +handle_other({channel_exit, Channel, Reason}, State) -> + handle_exception(State, Channel, Reason); +handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) -> + handle_dependent_exit(ChPid, Reason, State); +handle_other(terminate_connection, _State) -> + stop; +handle_other(handshake_timeout, State) when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) -> - mainloop(Deb, State); -handle_other(handshake_timeout, _Deb, State) -> + State; +handle_other(handshake_timeout, State) -> throw({handshake_timeout, State#v1.callback}); -handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) -> - mainloop(Deb, State); -handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) -> +handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> + State; +handle_other(heartbeat_timeout, #v1{connection_state = S}) -> throw({heartbeat_timeout, S}); -handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> +handle_other({'$gen_call', From, {shutdown, Explanation}}, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), case ForceTermination of - force -> ok; - normal -> mainloop(Deb, NewState) + force -> stop; + normal -> NewState end; -handle_other({'$gen_call', From, info}, Deb, State) -> +handle_other({'$gen_call', From, info}, State) -> gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Deb, State); -handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> + State; +handle_other({'$gen_call', From, {info, Items}}, State) -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) + State; +handle_other({'$gen_cast', force_event_refresh}, State) when ?IS_RUNNING(State) -> rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + State; +handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. - mainloop(Deb, State); -handle_other(ensure_stats, Deb, State) -> - mainloop(Deb, ensure_stats_timer(State)); -handle_other(emit_stats, Deb, State) -> - mainloop(Deb, emit_stats(State)); -handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); -handle_other({bump_credit, Msg}, Deb, State) -> + State; +handle_other(ensure_stats, State) -> + ensure_stats_timer(State); +handle_other(emit_stats, State) -> + emit_stats(State); +handle_other({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), - recvloop(Deb, control_throttle(State)); -handle_other(Other, _Deb, _State) -> + control_throttle(State); +handle_other(Other, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). @@ -437,13 +443,13 @@ close_connection(State = #v1{queue_collector = Collector, handle_dependent_exit(ChPid, Reason, State) -> case {channel_cleanup(ChPid), termination_kind(Reason)} of - {undefined, uncontrolled} -> - exit({abnormal_dependent_exit, ChPid, Reason}); - {_Channel, controlled} -> - maybe_close(control_throttle(State)); - {Channel, uncontrolled} -> - maybe_close(handle_exception(control_throttle(State), - Channel, Reason)) + {undefined, controlled} -> State; + {undefined, uncontrolled} -> exit({abnormal_dependent_exit, + ChPid, Reason}); + {_Channel, controlled} -> maybe_close(control_throttle(State)); + {Channel, uncontrolled} -> State1 = handle_exception( + State, Channel, Reason), + maybe_close(control_throttle(State1)) end. terminate_channels() -> @@ -636,7 +642,10 @@ process_frame(Frame, Channel, State) -> post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), - State; + %% This is not strictly necessary, but more obviously + %% correct. Also note that we do not need to call maybe_close/1 + %% since we cannot possibly be in the 'closing' state. + control_throttle(State); post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> maybe_block(State); post_process_frame({content_body, _}, _ChPid, State) -> diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl index 8a237105..6b62c974 100644 --- a/src/rabbit_runtime_parameter.erl +++ b/src/rabbit_runtime_parameter.erl @@ -23,8 +23,6 @@ -callback validate(rabbit_types:vhost(), binary(), binary(), term()) -> validate_results(). --callback validate_clear(rabbit_types:vhost(), binary(), - binary()) -> validate_results(). -callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'. -callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'. @@ -35,7 +33,6 @@ behaviour_info(callbacks) -> [ {validate, 4}, - {validate_clear, 3}, {notify, 4}, {notify_clear, 3} ]; diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 2615372c..b1100b65 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -120,21 +120,13 @@ clear(VHost, Component, Name) -> clear_any(VHost, Component, Name). clear_any(VHost, Component, Name) -> - case clear_any0(VHost, Component, Name) of - ok -> ok; - {errors, L} -> format_error(L) - end. - -clear_any0(VHost, Component, Name) -> - case lookup_component(Component) of - {ok, Mod} -> case flatten_errors( - Mod:validate_clear(VHost, Component, Name)) of - ok -> mnesia_clear(VHost, Component, Name), - Mod:notify_clear(VHost, Component, Name), - ok; - E -> E - end; - E -> E + case lookup(VHost, Component, Name) of + not_found -> {error_string, "Parameter does not exist"}; + _ -> mnesia_clear(VHost, Component, Name), + case lookup_component(Component) of + {ok, Mod} -> Mod:notify_clear(VHost, Component, Name); + _ -> ok + end end. mnesia_clear(VHost, Component, Name) -> diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl index c27f1b4a..05c1dbc1 100644 --- a/src/rabbit_runtime_parameters_test.erl +++ b/src/rabbit_runtime_parameters_test.erl @@ -18,7 +18,7 @@ -behaviour(rabbit_runtime_parameter). -behaviour(rabbit_policy_validator). --export([validate/4, validate_clear/3, notify/4, notify_clear/3]). +-export([validate/4, notify/4, notify_clear/3]). -export([register/0, unregister/0]). -export([validate_policy/1]). -export([register_policy_validator/0, unregister_policy_validator/0]). @@ -35,10 +35,6 @@ validate(_, <<"test">>, <<"good">>, _Term) -> ok; validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok; validate(_, <<"test">>, _, _) -> {error, "meh", []}. -validate_clear(_, <<"test">>, <<"good">>) -> ok; -validate_clear(_, <<"test">>, <<"maybe">>) -> ok; -validate_clear(_, <<"test">>, _) -> {error, "meh", []}. - notify(_, _, _, _) -> ok. notify_clear(_, _, _) -> ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e2af7efd..6db8b3d0 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -936,10 +936,10 @@ test_arguments_parser() -> test_dynamic_mirroring() -> %% Just unit tests of the node selection logic, see multi node %% tests for the rest... - Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) -> + Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) -> {NewM, NewSs0} = rabbit_mirror_queue_misc:suggested_queue_nodes( - Policy, Params, {OldM, OldSs}, All), + Policy, Params, CurrentState, All), NewSs1 = lists:sort(NewSs0), case dm_list_match(NewSs, NewSs1, ExtraSs) of ok -> ok; @@ -947,28 +947,36 @@ test_dynamic_mirroring() -> end end, - Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]), + + N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end, %% Add a node - Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), - Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]), + Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]), %% Add two nodes and drop one - Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]), %% Don't try to include nodes that are not running - Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), + Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]), %% If we can't find any of the nodes listed then just keep the master - Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), - %% And once that's happened, still keep the master even when not listed - Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]), - - Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]), - Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]), - Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]), - Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), + Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]), + %% And once that's happened, still keep the master even when not listed, + %% if nothing is synced + Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]), + %% But if something is synced we can lose the master - but make + %% sure we pick the new master from the nodes which are synced! + Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]), + Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]), + + Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]), + Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]), + Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]), + Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]), passed. @@ -1086,7 +1094,11 @@ test_runtime_parameters() -> ok = control_action(clear_parameter, ["test", "maybe"]), {error_string, _} = control_action(clear_parameter, ["test", "neverexisted"]), + + %% We can delete for a component that no longer exists + Good(["test", "good", "\"ignore\""]), rabbit_runtime_parameters_test:unregister(), + ok = control_action(clear_parameter, ["test", "good"]), passed. test_policy_validation() -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 8d2cbc41..d0f39221 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -95,9 +95,9 @@ internal_delete(VHostPath) -> || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)], [ok = rabbit_runtime_parameters:clear(VHostPath, proplists:get_value(component, Info), - proplists:get_value(key, Info)) + proplists:get_value(name, Info)) || Info <- rabbit_runtime_parameters:list(VHostPath)], - [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info)) + [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info)) || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. |