diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-20 12:21:15 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-20 12:21:15 +0100 |
commit | e0df9b4ebbf06e6bcbb14b5ba2a163b8dfeed270 (patch) | |
tree | f726803023e3c8b0d05210d1edb99259b4979e0c | |
parent | e039c90768ea78e9deed108b72702dba49ef0302 (diff) | |
parent | e8aac814c3c08661bee63244a992f5a1fe42749b (diff) | |
download | rabbitmq-server-e0df9b4ebbf06e6bcbb14b5ba2a163b8dfeed270.tar.gz |
MErge bug25813
-rw-r--r-- | src/rabbit.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 58 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 27 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 7 | ||||
-rw-r--r-- | src/rabbit_log.erl | 37 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 20 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 17 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
-rw-r--r-- | src/supervisor2.erl | 9 |
10 files changed, 111 insertions, 84 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 4b8af870..b00a1ad7 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -360,7 +360,7 @@ stop() -> undefined -> ok; _ -> await_startup(true) end, - rabbit_misc:local_info_msg("Stopping RabbitMQ~n", []), + rabbit_log:info("Stopping RabbitMQ~n", []), Apps = ?APPS ++ rabbit_plugins:active(), stop_apps(app_utils:app_dependency_order(Apps, true)). @@ -368,7 +368,7 @@ stop_and_halt() -> try stop() after - rabbit_misc:local_info_msg("Halting Erlang VM~n", []), + rabbit_log:info("Halting Erlang VM~n", []), init:stop() end, ok. @@ -481,7 +481,7 @@ environment() -> rotate_logs(BinarySuffix) -> Suffix = binary_to_list(BinarySuffix), - rabbit_misc:local_info_msg("Rotating logs with suffix '~s'~n", [Suffix]), + rabbit_log:info("Rotating logs with suffix '~s'~n", [Suffix]), log_rotation_result(rotate_logs(log_location(kernel), Suffix, rabbit_error_logger_file_h), @@ -638,7 +638,7 @@ boot_error(Reason, Fmt, Args, Stacktrace) -> basic_boot_error(Reason, Format, Args) -> io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), - rabbit_misc:local_info_msg(Format, Args), + rabbit_log:info(Format, Args), timer:sleep(1000), exit({?MODULE, failure_during_boot, Reason}). @@ -762,7 +762,7 @@ force_event_refresh(Ref) -> %% misc log_broker_started(Plugins) -> - rabbit_misc:with_local_io( + rabbit_log:with_local_io( fun() -> PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P]) || P <- Plugins]), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a33a8fcc..692179fc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -630,7 +630,7 @@ notify_sent_queue_down(QPid) -> resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). -internal_delete1(QueueName) -> +internal_delete1(QueueName, OnlyDurable) -> ok = mnesia:delete({rabbit_queue, QueueName}), %% this 'guarded' delete prevents unnecessary writes to the mnesia %% disk log @@ -640,7 +640,7 @@ internal_delete1(QueueName) -> end, %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. - rabbit_binding:remove_for_destination(QueueName). + rabbit_binding:remove_for_destination(QueueName, OnlyDurable). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -650,7 +650,7 @@ internal_delete(QueueName) -> {[], []} -> rabbit_misc:const({error, not_found}); _ -> - Deletions = internal_delete1(QueueName), + Deletions = internal_delete1(QueueName, false), T = rabbit_binding:process_deletions(Deletions), fun() -> ok = T(), @@ -677,7 +677,7 @@ forget_all_durable(Node) -> forget_node_for_queue(#amqqueue{name = Name, down_slave_nodes = []}) -> %% No slaves to recover from, queue is gone - rabbit_binding:process_deletions(internal_delete1(Name)); + rabbit_binding:process_deletions(internal_delete1(Name, true)); forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) -> %% Promote a slave while down - it'll happily recover as a master diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 7a095e06..d887f26a 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -25,7 +25,7 @@ -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, - remove_for_destination/1, remove_transient_for_destination/1]). + remove_for_destination/2, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- @@ -78,8 +78,8 @@ -> [rabbit_types:infos()]). -spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). -spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). --spec(remove_for_destination/1 :: - (rabbit_types:binding_destination()) -> deletions()). +-spec(remove_for_destination/2 :: + (rabbit_types:binding_destination(), boolean()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). -spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')). @@ -215,7 +215,8 @@ remove(Binding, InnerFun) -> remove(Src, Dst, B) -> ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), fun mnesia:delete_object/3), - Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()), + Deletions = maybe_auto_delete( + B#binding.source, [B], new_deletions(), false), process_deletions(Deletions). list(VHostPath) -> @@ -298,11 +299,11 @@ remove_for_source(SrcName) -> mnesia:match_object(rabbit_route, Match, write) ++ mnesia:match_object(rabbit_semi_durable_route, Match, write))). -remove_for_destination(DstName) -> - remove_for_destination(DstName, fun remove_routes/1). +remove_for_destination(DstName, OnlyDurable) -> + remove_for_destination(DstName, OnlyDurable, fun remove_routes/1). remove_transient_for_destination(DstName) -> - remove_for_destination(DstName, fun remove_transient_routes/1). + remove_for_destination(DstName, false, fun remove_transient_routes/1). %%---------------------------------------------------------------------------- @@ -428,36 +429,47 @@ remove_transient_routes(Routes) -> R#route.binding end || R <- Routes]. -remove_for_destination(DstName, Fun) -> +remove_for_destination(DstName, OnlyDurable, Fun) -> lock_route_tables(), - Match = reverse_route( - #route{binding = #binding{destination = DstName, _ = '_'}}), - Routes = [reverse_route(R) || R <- mnesia:match_object( - rabbit_reverse_route, Match, write)], + MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}}, + MatchRev = reverse_route(MatchFwd), + Routes = case OnlyDurable of + false -> [reverse_route(R) || + R <- mnesia:match_object( + rabbit_reverse_route, MatchRev, write)]; + true -> lists:usort( + mnesia:match_object( + rabbit_durable_route, MatchFwd, write) ++ + mnesia:match_object( + rabbit_semi_durable_route, MatchFwd, write)) + end, Bindings = Fun(Routes), - group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), - lists:keysort(#binding.source, Bindings)). + group_bindings_fold(fun maybe_auto_delete/4, new_deletions(), + lists:keysort(#binding.source, Bindings), OnlyDurable). %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. -group_bindings_fold(_Fun, Acc, []) -> +group_bindings_fold(_Fun, Acc, [], _OnlyDurable) -> Acc; -group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) -> - group_bindings_fold(Fun, SrcName, Acc, Bs, [B]). +group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs], + OnlyDurable) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable). group_bindings_fold( - Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) -> - group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]); -group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> + Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings, + OnlyDurable) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings], OnlyDurable); +group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings, OnlyDurable) -> %% Either Removed is [], or its head has a non-matching SrcName. - group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). + group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc, OnlyDurable), Removed, + OnlyDurable). -maybe_auto_delete(XName, Bindings, Deletions) -> +maybe_auto_delete(XName, Bindings, Deletions, OnlyDurable) -> {Entry, Deletions1} = case mnesia:read({rabbit_exchange, XName}) of [] -> {{undefined, not_deleted, Bindings}, Deletions}; - [X] -> case rabbit_exchange:maybe_auto_delete(X) of + [X] -> case rabbit_exchange:maybe_auto_delete(X, OnlyDurable) of not_deleted -> {{X, not_deleted, Bindings}, Deletions}; {deleted, Deletions2} -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 3dc15d6b..f184174c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -25,7 +25,7 @@ info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2, validate_binding/2]). %% these must be run inside a mnesia tx --export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]). +-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]). %%---------------------------------------------------------------------------- @@ -90,8 +90,8 @@ -spec(validate_binding/2 :: (rabbit_types:exchange(), rabbit_types:binding()) -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})). --spec(maybe_auto_delete/1:: - (rabbit_types:exchange()) +-spec(maybe_auto_delete/2:: + (rabbit_types:exchange(), boolean()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). -spec(serial/1 :: (rabbit_types:exchange()) -> fun((boolean()) -> 'none' | pos_integer())). @@ -418,13 +418,13 @@ call_with_exchange(XName, Fun) -> delete(XName, IfUnused) -> Fun = case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 + true -> fun conditional_delete/2; + false -> fun unconditional_delete/2 end, call_with_exchange( XName, fun (X) -> - case Fun(X) of + case Fun(X, false) of {deleted, X, Bs, Deletions} -> rabbit_binding:process_deletions( rabbit_binding:add_deletion( @@ -438,21 +438,21 @@ validate_binding(X = #exchange{type = XType}, Binding) -> Module = type_to_module(XType), Module:validate_binding(X, Binding). -maybe_auto_delete(#exchange{auto_delete = false}) -> +maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) -> not_deleted; -maybe_auto_delete(#exchange{auto_delete = true} = X) -> - case conditional_delete(X) of +maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) -> + case conditional_delete(X, OnlyDurable) of {error, in_use} -> not_deleted; {deleted, X, [], Deletions} -> {deleted, Deletions} end. -conditional_delete(X = #exchange{name = XName}) -> +conditional_delete(X = #exchange{name = XName}, OnlyDurable) -> case rabbit_binding:has_for_source(XName) of - false -> unconditional_delete(X); + false -> unconditional_delete(X, OnlyDurable); true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = XName}) -> +unconditional_delete(X = #exchange{name = XName}, OnlyDurable) -> %% this 'guarded' delete prevents unnecessary writes to the mnesia %% disk log case mnesia:wread({rabbit_durable_exchange, XName}) of @@ -462,7 +462,8 @@ unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_exchange, XName}), ok = mnesia:delete({rabbit_exchange_serial, XName}), Bindings = rabbit_binding:remove_for_source(XName), - {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. + {deleted, X, Bindings, rabbit_binding:remove_for_destination( + XName, OnlyDurable)}. next_serial(XName) -> Serial = peek_serial(XName, write), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b17b7de9..f32a187d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -183,12 +183,13 @@ -record(lim, {prefetch_count = 0, ch_pid, + %% 'Notify' is a boolean that indicates whether a queue should be + %% notified of a change in the limit or volume that may allow it to + %% deliver more messages via the limiter's channel. queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). -%% 'Notify' is a boolean that indicates whether a queue should be -%% notified of a change in the limit or volume that may allow it to -%% deliver more messages via the limiter's channel. +%% mode is of type credit_mode() -record(credit, {credit = 0, mode}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index f60336a1..e05ef05a 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -17,6 +17,7 @@ -module(rabbit_log). -export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). +-export([with_local_io/1]). %%---------------------------------------------------------------------------- @@ -37,6 +38,8 @@ -spec(error/1 :: (string()) -> 'ok'). -spec(error/2 :: (string(), [any()]) -> 'ok'). +-spec(with_local_io/1 :: (fun (() -> A)) -> A). + -endif. %%---------------------------------------------------------------------------- @@ -46,11 +49,12 @@ log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). log(Category, Level, Fmt, Args) when is_list(Args) -> case level(Level) =< catlevel(Category) of false -> ok; - true -> (case Level of - info -> fun error_logger:info_msg/2; - warning -> fun error_logger:warning_msg/2; - error -> fun error_logger:error_msg/2 - end)(Fmt, Args) + true -> F = case Level of + info -> fun error_logger:info_msg/2; + warning -> fun error_logger:warning_msg/2; + error -> fun error_logger:error_msg/2 + end, + with_local_io(fun () -> F(Fmt, Args) end) end. info(Fmt) -> log(default, info, Fmt). @@ -61,7 +65,12 @@ error(Fmt) -> log(default, error, Fmt). error(Fmt, Args) -> log(default, error, Fmt, Args). catlevel(Category) -> - {ok, CatLevelList} = application:get_env(rabbit, log_levels), + %% We can get here as part of rabbitmqctl when it is impersonating + %% a node; in which case the env will not be defined. + CatLevelList = case application:get_env(rabbit, log_levels) of + {ok, L} -> L; + undefined -> [] + end, level(proplists:get_value(Category, CatLevelList, info)). %%-------------------------------------------------------------------- @@ -70,3 +79,19 @@ level(info) -> 3; level(warning) -> 2; level(error) -> 1; level(none) -> 0. + +%% Execute Fun using the IO system of the local node (i.e. the node on +%% which the code is executing). Since this is invoked for every log +%% message, we try to avoid unnecessarily churning group_leader/1. +with_local_io(Fun) -> + GL = group_leader(), + Node = node(), + case node(GL) of + Node -> Fun(); + _ -> group_leader(whereis(user), self()), + try + Fun() + after + group_leader(GL, self()) + end + end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 7ff88f04..d2456918 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -42,7 +42,6 @@ -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([format/2, format_many/1, format_stderr/2]). --export([with_local_io/1, local_info_msg/2]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]). @@ -185,8 +184,6 @@ -spec(format/2 :: (string(), [any()]) -> string()). -spec(format_many/1 :: ([{string(), [any()]}]) -> string()). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). --spec(with_local_io/1 :: (fun (() -> A)) -> A). --spec(local_info_msg/2 :: (string(), [any()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). @@ -644,23 +641,6 @@ format_stderr(Fmt, Args) -> end, ok. -%% Execute Fun using the IO system of the local node (i.e. the node on -%% which the code is executing). -with_local_io(Fun) -> - GL = group_leader(), - group_leader(whereis(user), self()), - try - Fun() - after - group_leader(GL, self()) - end. - -%% Log an info message on the local node using the standard logger. -%% Use this if the call didn't originate on the local node (e.g. -%% rabbitmqctl calls). -local_info_msg(Format, Args) -> - with_local_io(fun () -> rabbit_log:info(Format, Args) end). - unfold(Fun, Init) -> unfold(Fun, [], Init). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a499686f..d5dd9712 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -177,14 +177,13 @@ join_cluster(DiscoveryNode, NodeType) -> reset_gracefully(), %% Join the cluster - rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", - [ClusterNodes, NodeType]), + rabbit_log:info("Clustering with ~p as ~p node~n", + [ClusterNodes, NodeType]), ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), rabbit_node_monitor:notify_joined_cluster(), ok; true -> - rabbit_misc:local_info_msg("Already member of cluster: ~p~n", - [ClusterNodes]), + rabbit_log:info("Already member of cluster: ~p~n", [ClusterNodes]), {ok, already_member} end. @@ -193,12 +192,12 @@ join_cluster(DiscoveryNode, NodeType) -> %% persisted messages reset() -> ensure_mnesia_not_running(), - rabbit_misc:local_info_msg("Resetting Rabbit~n", []), + rabbit_log:info("Resetting Rabbit~n", []), reset_gracefully(). force_reset() -> ensure_mnesia_not_running(), - rabbit_misc:local_info_msg("Resetting Rabbit forcefully~n", []), + rabbit_log:info("Resetting Rabbit forcefully~n", []), wipe(). reset_gracefully() -> @@ -254,8 +253,8 @@ update_cluster_nodes(DiscoveryNode) -> %% nodes mnesia:delete_schema([node()]), rabbit_node_monitor:write_cluster_status(Status), - rabbit_misc:local_info_msg("Updating cluster nodes from ~p~n", - [DiscoveryNode]), + rabbit_log:info("Updating cluster nodes from ~p~n", + [DiscoveryNode]), init_db_with_mnesia(AllNodes, node_type(), true, true); false -> e(inconsistent_cluster) @@ -278,7 +277,7 @@ forget_cluster_node(Node, RemoveWhenOffline) -> {true, false} -> remove_node_offline_node(Node); {true, true} -> e(online_node_offline_flag); {false, false} -> e(offline_node_no_offline_flag); - {false, true} -> rabbit_misc:local_info_msg( + {false, true} -> rabbit_log:info( "Removing node ~p from cluster~n", [Node]), case remove_node_if_mnesia_running(Node) of ok -> ok; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b2930f88..ca73006a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -412,7 +412,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), rabbit_event:init_stats_timer(State, #v1.stats_timer); -handle_other({'$gen_cast', force_event_refresh}, State) -> +handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> %% Ignore, we will emit a created event once we start running. State; handle_other(ensure_stats, State) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 4eafd3b1..92ecb0df 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -171,6 +171,15 @@ MaxT :: non_neg_integer()}, [ChildSpec :: child_spec()]}} | ignore. +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{init,1}]; +behaviour_info(_Other) -> + undefined. + -endif. -define(restarting(_Pid_), {restarting,_Pid_}). |