summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-20 12:21:15 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-20 12:21:15 +0100
commite0df9b4ebbf06e6bcbb14b5ba2a163b8dfeed270 (patch)
treef726803023e3c8b0d05210d1edb99259b4979e0c
parente039c90768ea78e9deed108b72702dba49ef0302 (diff)
parente8aac814c3c08661bee63244a992f5a1fe42749b (diff)
downloadrabbitmq-server-e0df9b4ebbf06e6bcbb14b5ba2a163b8dfeed270.tar.gz
MErge bug25813
-rw-r--r--src/rabbit.erl10
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_binding.erl58
-rw-r--r--src/rabbit_exchange.erl27
-rw-r--r--src/rabbit_limiter.erl7
-rw-r--r--src/rabbit_log.erl37
-rw-r--r--src/rabbit_misc.erl20
-rw-r--r--src/rabbit_mnesia.erl17
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/supervisor2.erl9
10 files changed, 111 insertions, 84 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 4b8af870..b00a1ad7 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -360,7 +360,7 @@ stop() ->
undefined -> ok;
_ -> await_startup(true)
end,
- rabbit_misc:local_info_msg("Stopping RabbitMQ~n", []),
+ rabbit_log:info("Stopping RabbitMQ~n", []),
Apps = ?APPS ++ rabbit_plugins:active(),
stop_apps(app_utils:app_dependency_order(Apps, true)).
@@ -368,7 +368,7 @@ stop_and_halt() ->
try
stop()
after
- rabbit_misc:local_info_msg("Halting Erlang VM~n", []),
+ rabbit_log:info("Halting Erlang VM~n", []),
init:stop()
end,
ok.
@@ -481,7 +481,7 @@ environment() ->
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
- rabbit_misc:local_info_msg("Rotating logs with suffix '~s'~n", [Suffix]),
+ rabbit_log:info("Rotating logs with suffix '~s'~n", [Suffix]),
log_rotation_result(rotate_logs(log_location(kernel),
Suffix,
rabbit_error_logger_file_h),
@@ -638,7 +638,7 @@ boot_error(Reason, Fmt, Args, Stacktrace) ->
basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
- rabbit_misc:local_info_msg(Format, Args),
+ rabbit_log:info(Format, Args),
timer:sleep(1000),
exit({?MODULE, failure_during_boot, Reason}).
@@ -762,7 +762,7 @@ force_event_refresh(Ref) ->
%% misc
log_broker_started(Plugins) ->
- rabbit_misc:with_local_io(
+ rabbit_log:with_local_io(
fun() ->
PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P])
|| P <- Plugins]),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a33a8fcc..692179fc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -630,7 +630,7 @@ notify_sent_queue_down(QPid) ->
resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
-internal_delete1(QueueName) ->
+internal_delete1(QueueName, OnlyDurable) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
%% this 'guarded' delete prevents unnecessary writes to the mnesia
%% disk log
@@ -640,7 +640,7 @@ internal_delete1(QueueName) ->
end,
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
- rabbit_binding:remove_for_destination(QueueName).
+ rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -650,7 +650,7 @@ internal_delete(QueueName) ->
{[], []} ->
rabbit_misc:const({error, not_found});
_ ->
- Deletions = internal_delete1(QueueName),
+ Deletions = internal_delete1(QueueName, false),
T = rabbit_binding:process_deletions(Deletions),
fun() ->
ok = T(),
@@ -677,7 +677,7 @@ forget_all_durable(Node) ->
forget_node_for_queue(#amqqueue{name = Name,
down_slave_nodes = []}) ->
%% No slaves to recover from, queue is gone
- rabbit_binding:process_deletions(internal_delete1(Name));
+ rabbit_binding:process_deletions(internal_delete1(Name, true));
forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) ->
%% Promote a slave while down - it'll happily recover as a master
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 7a095e06..d887f26a 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -25,7 +25,7 @@
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
- remove_for_destination/1, remove_transient_for_destination/1]).
+ remove_for_destination/2, remove_transient_for_destination/1]).
%%----------------------------------------------------------------------------
@@ -78,8 +78,8 @@
-> [rabbit_types:infos()]).
-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
--spec(remove_for_destination/1 ::
- (rabbit_types:binding_destination()) -> deletions()).
+-spec(remove_for_destination/2 ::
+ (rabbit_types:binding_destination(), boolean()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')).
@@ -215,7 +215,8 @@ remove(Binding, InnerFun) ->
remove(Src, Dst, B) ->
ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
fun mnesia:delete_object/3),
- Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()),
+ Deletions = maybe_auto_delete(
+ B#binding.source, [B], new_deletions(), false),
process_deletions(Deletions).
list(VHostPath) ->
@@ -298,11 +299,11 @@ remove_for_source(SrcName) ->
mnesia:match_object(rabbit_route, Match, write) ++
mnesia:match_object(rabbit_semi_durable_route, Match, write))).
-remove_for_destination(DstName) ->
- remove_for_destination(DstName, fun remove_routes/1).
+remove_for_destination(DstName, OnlyDurable) ->
+ remove_for_destination(DstName, OnlyDurable, fun remove_routes/1).
remove_transient_for_destination(DstName) ->
- remove_for_destination(DstName, fun remove_transient_routes/1).
+ remove_for_destination(DstName, false, fun remove_transient_routes/1).
%%----------------------------------------------------------------------------
@@ -428,36 +429,47 @@ remove_transient_routes(Routes) ->
R#route.binding
end || R <- Routes].
-remove_for_destination(DstName, Fun) ->
+remove_for_destination(DstName, OnlyDurable, Fun) ->
lock_route_tables(),
- Match = reverse_route(
- #route{binding = #binding{destination = DstName, _ = '_'}}),
- Routes = [reverse_route(R) || R <- mnesia:match_object(
- rabbit_reverse_route, Match, write)],
+ MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}},
+ MatchRev = reverse_route(MatchFwd),
+ Routes = case OnlyDurable of
+ false -> [reverse_route(R) ||
+ R <- mnesia:match_object(
+ rabbit_reverse_route, MatchRev, write)];
+ true -> lists:usort(
+ mnesia:match_object(
+ rabbit_durable_route, MatchFwd, write) ++
+ mnesia:match_object(
+ rabbit_semi_durable_route, MatchFwd, write))
+ end,
Bindings = Fun(Routes),
- group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
- lists:keysort(#binding.source, Bindings)).
+ group_bindings_fold(fun maybe_auto_delete/4, new_deletions(),
+ lists:keysort(#binding.source, Bindings), OnlyDurable).
%% Requires that its input binding list is sorted in exchange-name
%% order, so that the grouping of bindings (for passing to
%% group_bindings_and_auto_delete1) works properly.
-group_bindings_fold(_Fun, Acc, []) ->
+group_bindings_fold(_Fun, Acc, [], _OnlyDurable) ->
Acc;
-group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) ->
- group_bindings_fold(Fun, SrcName, Acc, Bs, [B]).
+group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs],
+ OnlyDurable) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable).
group_bindings_fold(
- Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) ->
- group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]);
-group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
+ Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings,
+ OnlyDurable) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings], OnlyDurable);
+group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings, OnlyDurable) ->
%% Either Removed is [], or its head has a non-matching SrcName.
- group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
+ group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc, OnlyDurable), Removed,
+ OnlyDurable).
-maybe_auto_delete(XName, Bindings, Deletions) ->
+maybe_auto_delete(XName, Bindings, Deletions, OnlyDurable) ->
{Entry, Deletions1} =
case mnesia:read({rabbit_exchange, XName}) of
[] -> {{undefined, not_deleted, Bindings}, Deletions};
- [X] -> case rabbit_exchange:maybe_auto_delete(X) of
+ [X] -> case rabbit_exchange:maybe_auto_delete(X, OnlyDurable) of
not_deleted ->
{{X, not_deleted, Bindings}, Deletions};
{deleted, Deletions2} ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 3dc15d6b..f184174c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -25,7 +25,7 @@
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
--export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
+-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]).
%%----------------------------------------------------------------------------
@@ -90,8 +90,8 @@
-spec(validate_binding/2 ::
(rabbit_types:exchange(), rabbit_types:binding())
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})).
--spec(maybe_auto_delete/1::
- (rabbit_types:exchange())
+-spec(maybe_auto_delete/2::
+ (rabbit_types:exchange(), boolean())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
-spec(serial/1 :: (rabbit_types:exchange()) ->
fun((boolean()) -> 'none' | pos_integer())).
@@ -418,13 +418,13 @@ call_with_exchange(XName, Fun) ->
delete(XName, IfUnused) ->
Fun = case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
+ true -> fun conditional_delete/2;
+ false -> fun unconditional_delete/2
end,
call_with_exchange(
XName,
fun (X) ->
- case Fun(X) of
+ case Fun(X, false) of
{deleted, X, Bs, Deletions} ->
rabbit_binding:process_deletions(
rabbit_binding:add_deletion(
@@ -438,21 +438,21 @@ validate_binding(X = #exchange{type = XType}, Binding) ->
Module = type_to_module(XType),
Module:validate_binding(X, Binding).
-maybe_auto_delete(#exchange{auto_delete = false}) ->
+maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) ->
not_deleted;
-maybe_auto_delete(#exchange{auto_delete = true} = X) ->
- case conditional_delete(X) of
+maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) ->
+ case conditional_delete(X, OnlyDurable) of
{error, in_use} -> not_deleted;
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
-conditional_delete(X = #exchange{name = XName}) ->
+conditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
case rabbit_binding:has_for_source(XName) of
- false -> unconditional_delete(X);
+ false -> unconditional_delete(X, OnlyDurable);
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = XName}) ->
+unconditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
%% this 'guarded' delete prevents unnecessary writes to the mnesia
%% disk log
case mnesia:wread({rabbit_durable_exchange, XName}) of
@@ -462,7 +462,8 @@ unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
- {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
+ {deleted, X, Bindings, rabbit_binding:remove_for_destination(
+ XName, OnlyDurable)}.
next_serial(XName) ->
Serial = peek_serial(XName, write),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index b17b7de9..f32a187d 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -183,12 +183,13 @@
-record(lim, {prefetch_count = 0,
ch_pid,
+ %% 'Notify' is a boolean that indicates whether a queue should be
+ %% notified of a change in the limit or volume that may allow it to
+ %% deliver more messages via the limiter's channel.
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
-%% 'Notify' is a boolean that indicates whether a queue should be
-%% notified of a change in the limit or volume that may allow it to
-%% deliver more messages via the limiter's channel.
+%% mode is of type credit_mode()
-record(credit, {credit = 0, mode}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index f60336a1..e05ef05a 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -17,6 +17,7 @@
-module(rabbit_log).
-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
+-export([with_local_io/1]).
%%----------------------------------------------------------------------------
@@ -37,6 +38,8 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
+-spec(with_local_io/1 :: (fun (() -> A)) -> A).
+
-endif.
%%----------------------------------------------------------------------------
@@ -46,11 +49,12 @@ log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
log(Category, Level, Fmt, Args) when is_list(Args) ->
case level(Level) =< catlevel(Category) of
false -> ok;
- true -> (case Level of
- info -> fun error_logger:info_msg/2;
- warning -> fun error_logger:warning_msg/2;
- error -> fun error_logger:error_msg/2
- end)(Fmt, Args)
+ true -> F = case Level of
+ info -> fun error_logger:info_msg/2;
+ warning -> fun error_logger:warning_msg/2;
+ error -> fun error_logger:error_msg/2
+ end,
+ with_local_io(fun () -> F(Fmt, Args) end)
end.
info(Fmt) -> log(default, info, Fmt).
@@ -61,7 +65,12 @@ error(Fmt) -> log(default, error, Fmt).
error(Fmt, Args) -> log(default, error, Fmt, Args).
catlevel(Category) ->
- {ok, CatLevelList} = application:get_env(rabbit, log_levels),
+ %% We can get here as part of rabbitmqctl when it is impersonating
+ %% a node; in which case the env will not be defined.
+ CatLevelList = case application:get_env(rabbit, log_levels) of
+ {ok, L} -> L;
+ undefined -> []
+ end,
level(proplists:get_value(Category, CatLevelList, info)).
%%--------------------------------------------------------------------
@@ -70,3 +79,19 @@ level(info) -> 3;
level(warning) -> 2;
level(error) -> 1;
level(none) -> 0.
+
+%% Execute Fun using the IO system of the local node (i.e. the node on
+%% which the code is executing). Since this is invoked for every log
+%% message, we try to avoid unnecessarily churning group_leader/1.
+with_local_io(Fun) ->
+ GL = group_leader(),
+ Node = node(),
+ case node(GL) of
+ Node -> Fun();
+ _ -> group_leader(whereis(user), self()),
+ try
+ Fun()
+ after
+ group_leader(GL, self())
+ end
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 7ff88f04..d2456918 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -42,7 +42,6 @@
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([format/2, format_many/1, format_stderr/2]).
--export([with_local_io/1, local_info_msg/2]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]).
@@ -185,8 +184,6 @@
-spec(format/2 :: (string(), [any()]) -> string()).
-spec(format_many/1 :: ([{string(), [any()]}]) -> string()).
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
--spec(with_local_io/1 :: (fun (() -> A)) -> A).
--spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
@@ -644,23 +641,6 @@ format_stderr(Fmt, Args) ->
end,
ok.
-%% Execute Fun using the IO system of the local node (i.e. the node on
-%% which the code is executing).
-with_local_io(Fun) ->
- GL = group_leader(),
- group_leader(whereis(user), self()),
- try
- Fun()
- after
- group_leader(GL, self())
- end.
-
-%% Log an info message on the local node using the standard logger.
-%% Use this if the call didn't originate on the local node (e.g.
-%% rabbitmqctl calls).
-local_info_msg(Format, Args) ->
- with_local_io(fun () -> rabbit_log:info(Format, Args) end).
-
unfold(Fun, Init) ->
unfold(Fun, [], Init).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a499686f..d5dd9712 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -177,14 +177,13 @@ join_cluster(DiscoveryNode, NodeType) ->
reset_gracefully(),
%% Join the cluster
- rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n",
- [ClusterNodes, NodeType]),
+ rabbit_log:info("Clustering with ~p as ~p node~n",
+ [ClusterNodes, NodeType]),
ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true),
rabbit_node_monitor:notify_joined_cluster(),
ok;
true ->
- rabbit_misc:local_info_msg("Already member of cluster: ~p~n",
- [ClusterNodes]),
+ rabbit_log:info("Already member of cluster: ~p~n", [ClusterNodes]),
{ok, already_member}
end.
@@ -193,12 +192,12 @@ join_cluster(DiscoveryNode, NodeType) ->
%% persisted messages
reset() ->
ensure_mnesia_not_running(),
- rabbit_misc:local_info_msg("Resetting Rabbit~n", []),
+ rabbit_log:info("Resetting Rabbit~n", []),
reset_gracefully().
force_reset() ->
ensure_mnesia_not_running(),
- rabbit_misc:local_info_msg("Resetting Rabbit forcefully~n", []),
+ rabbit_log:info("Resetting Rabbit forcefully~n", []),
wipe().
reset_gracefully() ->
@@ -254,8 +253,8 @@ update_cluster_nodes(DiscoveryNode) ->
%% nodes
mnesia:delete_schema([node()]),
rabbit_node_monitor:write_cluster_status(Status),
- rabbit_misc:local_info_msg("Updating cluster nodes from ~p~n",
- [DiscoveryNode]),
+ rabbit_log:info("Updating cluster nodes from ~p~n",
+ [DiscoveryNode]),
init_db_with_mnesia(AllNodes, node_type(), true, true);
false ->
e(inconsistent_cluster)
@@ -278,7 +277,7 @@ forget_cluster_node(Node, RemoveWhenOffline) ->
{true, false} -> remove_node_offline_node(Node);
{true, true} -> e(online_node_offline_flag);
{false, false} -> e(offline_node_no_offline_flag);
- {false, true} -> rabbit_misc:local_info_msg(
+ {false, true} -> rabbit_log:info(
"Removing node ~p from cluster~n", [Node]),
case remove_node_if_mnesia_running(Node) of
ok -> ok;
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b2930f88..ca73006a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -412,7 +412,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
rabbit_event:init_stats_timer(State, #v1.stats_timer);
-handle_other({'$gen_cast', force_event_refresh}, State) ->
+handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
%% Ignore, we will emit a created event once we start running.
State;
handle_other(ensure_stats, State) ->
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 4eafd3b1..92ecb0df 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -171,6 +171,15 @@
MaxT :: non_neg_integer()},
[ChildSpec :: child_spec()]}}
| ignore.
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{init,1}];
+behaviour_info(_Other) ->
+ undefined.
+
-endif.
-define(restarting(_Pid_), {restarting,_Pid_}).