summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-09-26 14:06:10 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-09-26 14:06:10 +0100
commitb5e92f44d67620e008b8eaabaeadc922e76a763e (patch)
tree788103ba155053ffcf141406492a64585d2f376c
parentc372383769afd21d27bf9be00fd521d83bf664bb (diff)
parente6c877d3cdb6ca369d4d9c94af6187794affaac0 (diff)
downloadrabbitmq-server-bug25179.tar.gz
merge default into bug25179bug25179
-rw-r--r--README2
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/app_utils.erl41
-rw-r--r--src/pmon.erl32
-rw-r--r--src/rabbit.erl42
-rw-r--r--src/rabbit_amqqueue.erl42
-rw-r--r--src/rabbit_amqqueue_process.erl28
-rw-r--r--src/rabbit_basic.erl32
-rw-r--r--src/rabbit_binding.erl8
-rw-r--r--src/rabbit_channel.erl23
-rw-r--r--src/rabbit_control_main.erl2
-rw-r--r--src/rabbit_direct.erl15
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mirror_queue_slave.erl149
-rw-r--r--src/rabbit_mnesia.erl56
-rw-r--r--src/rabbit_node_monitor.erl161
-rw-r--r--src/rabbit_tests.erl5
-rw-r--r--src/rabbit_types.erl1
19 files changed, 299 insertions, 355 deletions
diff --git a/README b/README
index 67e3a66a..90e99e62 100644
--- a/README
+++ b/README
@@ -1 +1 @@
-Please see http://www.rabbitmq.com/build-server.html for build instructions.
+Please see http://www.rabbitmq.com/build-server.html for build instructions. \ No newline at end of file
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d6fac46d..fff92205 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -73,7 +73,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, sender, message, msg_seq_no}).
+-record(delivery, {mandatory, sender, message, msg_seq_no}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
diff --git a/src/app_utils.erl b/src/app_utils.erl
index 4bef83a5..fdf6ed41 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -15,17 +15,21 @@
%%
-module(app_utils).
--export([load_applications/1, start_applications/1,
- stop_applications/1, app_dependency_order/2,
+-export([load_applications/1, start_applications/1, start_applications/2,
+ stop_applications/1, stop_applications/2, app_dependency_order/2,
wait_for_applications/1]).
-ifdef(use_specs).
--spec load_applications([atom()]) -> 'ok'.
--spec start_applications([atom()]) -> 'ok'.
--spec stop_applications([atom()]) -> 'ok'.
--spec wait_for_applications([atom()]) -> 'ok'.
--spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+-type error_handler() :: fun((atom(), any()) -> 'ok').
+
+-spec load_applications([atom()]) -> 'ok'.
+-spec start_applications([atom()]) -> 'ok'.
+-spec stop_applications([atom()]) -> 'ok'.
+-spec start_applications([atom()], error_handler()) -> 'ok'.
+-spec stop_applications([atom()], error_handler()) -> 'ok'.
+-spec wait_for_applications([atom()]) -> 'ok'.
+-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
-endif.
@@ -37,21 +41,34 @@ load_applications(Apps) ->
ok.
start_applications(Apps) ->
+ start_applications(
+ Apps, fun (App, Reason) ->
+ throw({error, {cannot_start_application, App, Reason}})
+ end).
+
+stop_applications(Apps) ->
+ stop_applications(
+ Apps, fun (App, Reason) ->
+ throw({error, {cannot_stop_application, App, Reason}})
+ end).
+
+start_applications(Apps, ErrorHandler) ->
manage_applications(fun lists:foldl/3,
fun application:start/1,
fun application:stop/1,
already_started,
- cannot_start_application,
+ ErrorHandler,
Apps).
-stop_applications(Apps) ->
+stop_applications(Apps, ErrorHandler) ->
manage_applications(fun lists:foldr/3,
fun application:stop/1,
fun application:start/1,
not_started,
- cannot_stop_application,
+ ErrorHandler,
Apps).
+
wait_for_applications(Apps) ->
[wait_for_application(App) || App <- Apps], ok.
@@ -107,14 +124,14 @@ app_dependencies(App) ->
{ok, Lst} -> Lst
end.
-manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
+manage_applications(Iterate, Do, Undo, SkipError, ErrorHandler, Apps) ->
Iterate(fun (App, Acc) ->
case Do(App) of
ok -> [App | Acc];
{error, {SkipError, _}} -> Acc;
{error, Reason} ->
lists:foreach(Undo, Acc),
- throw({error, {ErrorTag, App, Reason}})
+ ErrorHandler(App, Reason)
end
end, [], Apps),
ok.
diff --git a/src/pmon.erl b/src/pmon.erl
index 45786577..1aeebb72 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -27,37 +27,39 @@
-opaque(?MODULE() :: dict()).
+-type(item() :: pid() | {atom(), node()}).
+
-spec(new/0 :: () -> ?MODULE()).
--spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
--spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()).
--spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
--spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()).
--spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()).
--spec(monitored/1 :: (?MODULE()) -> [pid()]).
+-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
+-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
+-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
+-spec(is_monitored/2 :: (item(), ?MODULE()) -> boolean()).
+-spec(erase/2 :: (item(), ?MODULE()) -> ?MODULE()).
+-spec(monitored/1 :: (?MODULE()) -> [item()]).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-endif.
new() -> dict:new().
-monitor(Pid, M) ->
- case dict:is_key(Pid, M) of
+monitor(Item, M) ->
+ case dict:is_key(Item, M) of
true -> M;
- false -> dict:store(Pid, erlang:monitor(process, Pid), M)
+ false -> dict:store(Item, erlang:monitor(process, Item), M)
end.
-monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids).
+monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
-demonitor(Pid, M) ->
- case dict:find(Pid, M) of
+demonitor(Item, M) ->
+ case dict:find(Item, M) of
{ok, MRef} -> erlang:demonitor(MRef),
- dict:erase(Pid, M);
+ dict:erase(Item, M);
error -> M
end.
-is_monitored(Pid, M) -> dict:is_key(Pid, M).
+is_monitored(Item, M) -> dict:is_key(Item, M).
-erase(Pid, M) -> dict:erase(Pid, M).
+erase(Item, M) -> dict:erase(Item, M).
monitored(M) -> dict:fetch_keys(M).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index afa97ddc..7b417b00 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -303,7 +303,8 @@ start() ->
ok = rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_mnesia:check_cluster_consistency(),
ok = ensure_working_log_handlers(),
- ok = app_utils:start_applications(app_startup_order()),
+ ok = app_utils:start_applications(
+ app_startup_order(), fun handle_app_error/2),
ok = print_plugin_info(rabbit_plugins:active())
end).
@@ -323,10 +324,17 @@ boot() ->
ok = app_utils:load_applications(ToBeLoaded),
StartupApps = app_utils:app_dependency_order(ToBeLoaded,
false),
- ok = app_utils:start_applications(StartupApps),
+ ok = app_utils:start_applications(
+ StartupApps, fun handle_app_error/2),
ok = print_plugin_info(Plugins)
end).
+handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
+ boot_error({could_not_start, App, Reason}, not_available);
+
+handle_app_error(App, Reason) ->
+ boot_error({could_not_start, App, Reason}, not_available).
+
start_it(StartFun) ->
try
StartFun()
@@ -450,7 +458,7 @@ run_boot_step({StepName, Attributes}) ->
[try
apply(M,F,A)
catch
- _:Reason -> boot_step_error(Reason, erlang:get_stacktrace())
+ _:Reason -> boot_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
@@ -489,14 +497,14 @@ sort_boot_steps(UnsortedSteps) ->
{mfa, {M,F,A}} <- Attributes,
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
- MissingFunctions -> boot_error(
+ MissingFunctions -> basic_boot_error(
"Boot step functions not exported: ~p~n",
[MissingFunctions])
end;
{error, {vertex, duplicate, StepName}} ->
- boot_error("Duplicate boot step name: ~w~n", [StepName]);
+ basic_boot_error("Duplicate boot step name: ~w~n", [StepName]);
{error, {edge, Reason, From, To}} ->
- boot_error(
+ basic_boot_error(
"Could not add boot step dependency of ~w on ~w:~n~s",
[To, From,
case Reason of
@@ -510,7 +518,7 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
-boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
case AllNodes -- [node()] of
@@ -521,15 +529,19 @@ boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
"Timeout contacting cluster nodes: ~p.~n", [Ns]),
Ns}
end,
- boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
-
-boot_step_error(Reason, Stacktrace) ->
- boot_error("Error description:~n ~p~n~n"
- "Log files (may contain more information):~n ~s~n ~s~n~n"
- "Stack trace:~n ~p~n~n",
- [Reason, log_location(kernel), log_location(sasl), Stacktrace]).
+ basic_boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
+
+boot_error(Reason, Stacktrace) ->
+ Fmt = "Error description:~n ~p~n~n"
+ "Log files (may contain more information):~n ~s~n ~s~n~n",
+ Args = [Reason, log_location(kernel), log_location(sasl)],
+ case Stacktrace of
+ not_available -> basic_boot_error(Fmt, Args);
+ _ -> basic_boot_error(Fmt ++ "Stack trace:~n ~p~n~n",
+ Args ++ [Stacktrace])
+ end.
-boot_error(Format, Args) ->
+basic_boot_error(Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
error_logger:error_msg(Format, Args),
timer:sleep(1000),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b2473f91..4a20a1bc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -60,7 +60,7 @@
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+-type(routing_result() :: 'routed' | 'unroutable').
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-spec(start/0 :: () -> [name()]).
@@ -650,18 +650,17 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
-deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+deliver([], #delivery{mandatory = false}, _Flow) ->
%% /dev/null optimisation
{routed, []};
-deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
+deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
+ %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver
+ %% will deliver the message to the queue process asynchronously,
+ %% and return true, which means all the QPids will always be
+ %% returned. It is therefore safe to use a fire-and-forget cast
+ %% here and return the QPids - the semantics is preserved. This
+ %% scales much better than the case below.
QPids = qpids(Qs),
case Flow of
flow -> [credit_flow:send(QPid) || QPid <- QPids];
@@ -673,21 +672,14 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
end),
{routed, QPids};
-deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
- _Flow) ->
- QPids = qpids(Qs),
- {Success, _} =
- delegate:invoke(
- QPids, fun (QPid) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity)
- end),
- case {Mandatory, Immediate,
- lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
- ({_, false}, {_, H}) -> {true, H}
- end, {false, []}, Success)} of
- {true, _ , {false, []}} -> {unroutable, []};
- {_ , true, {_ , []}} -> {not_delivered, []};
- {_ , _ , {_ , R}} -> {routed, R}
+deliver(Qs, Delivery, _Flow) ->
+ case delegate:invoke(
+ qpids(Qs), fun (QPid) ->
+ ok = gen_server2:call(QPid, {deliver, Delivery},
+ infinity)
+ end) of
+ {[], _} -> {unroutable, []};
+ {R , _} -> {routed, [QPid || {QPid, ok} <- R]}
end.
qpids(Qs) -> lists:append([[QPid | SPids] ||
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e647627c..0e3f0bac 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -767,7 +767,7 @@ dead_letter_fun(Reason, _State) ->
dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
DLMsg = make_dead_letter_msg(Reason, Msg, State),
- Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
@@ -1029,27 +1029,9 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
- %% FIXME: Is this correct semantics?
- %%
- %% I'm worried in particular about the case where an exchange has
- %% two queues against a particular routing key, and a message is
- %% sent in immediate mode through the binding. In non-immediate
- %% mode, both queues get the message, saving it for later if
- %% there's noone ready to receive it just now. In immediate mode,
- %% should both queues still get the message, somehow, or should
- %% just all ready-to-consume queues get the message, with unready
- %% queues discarding the message?
- %%
- Confirm = should_confirm_message(Delivery, State),
- {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
- reply(Delivered, case Delivered of
- true -> maybe_record_confirm_message(Confirm, State1);
- false -> discard_delivery(Delivery, State1)
- end);
-
-handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
- gen_server2:reply(From, true),
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" deliver mode.
+ gen_server2:reply(From, ok),
noreply(deliver_or_enqueue(Delivery, State));
handle_call({notify_down, ChPid}, From, State) ->
@@ -1195,7 +1177,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
State = #q{senders = Senders}) ->
- %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ %% Asynchronous, non-"mandatory" deliver mode.
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
pmon:monitor(Sender, Senders);
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 734456d3..db2b7e95 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,9 +18,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/4, publish/6, publish/1,
+-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/3, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -40,13 +40,13 @@
-spec(publish/4 ::
(exchange_input(), rabbit_router:routing_key(), properties_input(),
body_input()) -> publish_result()).
--spec(publish/6 ::
- (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
+-spec(publish/5 ::
+ (exchange_input(), rabbit_router:routing_key(), boolean(),
properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/4 ::
- (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
+-spec(delivery/3 ::
+ (boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
@@ -80,18 +80,16 @@
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
+ publish(Exchange, RoutingKeyBin, false, Properties, Body).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
- publish(X, delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined));
-publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
- publish(delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined)).
+publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(X, delivery(Mandatory, Message, undefined));
+publish(XName, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(delivery(Mandatory, Message, undefined)).
publish(Delivery = #delivery{
message = #basic_message{exchange_name = XName}}) ->
@@ -105,8 +103,8 @@ publish(X, Delivery) ->
{RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, RoutingRes, DeliveredQPids}.
-delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
+delivery(Mandatory, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 2e462354..0d23f716 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -169,9 +169,9 @@ add(Binding, InnerFun) ->
add(Src, Dst, B) ->
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
- case (not (SrcDurable andalso DstDurable) orelse
- mnesia:read({rabbit_durable_route, B}) =:= []) of
- true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
+ case (SrcDurable andalso DstDurable andalso
+ mnesia:read({rabbit_durable_route, B}) =/= []) of
+ false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
fun mnesia:write/3),
x_callback(transaction, Src, add_binding, B),
Serial = rabbit_exchange:serial(Src),
@@ -179,7 +179,7 @@ add(Src, Dst, B) ->
x_callback(Serial, Src, add_binding, B),
ok = rabbit_event:notify(binding_created, info(B))
end;
- false -> rabbit_misc:const({error, binding_not_found})
+ true -> rabbit_misc:const({error, binding_not_found})
end.
remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e50e823c..0d13312b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -136,7 +136,7 @@ flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_channel, list_local, []).
list_local() ->
@@ -598,10 +598,12 @@ handle_method(_Method, _, #ch{tx_status = TxStatus})
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
+handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "immediate=true", []);
+
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
- mandatory = Mandatory,
- immediate = Immediate},
+ mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
tx_status = TxStatus,
confirm_enabled = ConfirmEnabled,
@@ -623,8 +625,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_trace_in(Message, TraceState),
- Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
- MsgSeqNo),
+ Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
{noreply,
case TxStatus of
@@ -1342,20 +1343,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QPid <- DeliveredQPids]], publish, State2),
State2.
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
-process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{XName, 1}], return_not_delivered, State),
- record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, _, _, undefined, _, State) ->
+process_routing_result(routed, _, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
State#ch.unconfirmed)}.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index a6c4fe67..e75e1f6f 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -461,7 +461,7 @@ action(list_parameters, Node, [], Opts, Inform) ->
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
- N <- unsafe_rpc(Node, rabbit_mnesia, running_clustered_nodes, []),
+ N <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]),
Action <- [status, cluster_status, environment]],
VHosts = unsafe_rpc(Node, rabbit_vhost, list, []),
[print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index a669a2b3..689e5d83 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -31,7 +31,8 @@
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
--spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user()),
+-spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() |
+ {rabbit_types:username(), rabbit_types:password()}),
rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
{'ok', {rabbit_types:user(),
@@ -59,7 +60,7 @@ list_local() ->
pg_local:get_members(rabbit_direct).
list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_direct, list_local, []).
%%----------------------------------------------------------------------------
@@ -74,10 +75,17 @@ connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
{error, access_refused}
end;
+connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
+ connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid,
+ Infos);
+
connect(Username, VHost, Protocol, Pid, Infos) ->
+ connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos).
+
+connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
true ->
- case rabbit_access_control:check_user_login(Username, []) of
+ case rabbit_access_control:FunctionName(U, P) of
{ok, User} -> connect(User, VHost, Protocol, Pid, Infos);
{refused, _M, _A} -> {error, auth_failure}
end;
@@ -85,6 +93,7 @@ connect(Username, VHost, Protocol, Pid, Infos) ->
{error, broker_not_found_on_node}
end.
+
start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index f1672f4e..a9af2d8a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -81,7 +81,7 @@ publish1(RoutingKey, Format, Data, LogExch) ->
%% second resolution, not millisecond.
Timestamp = rabbit_misc:now_ms() div 1000,
{ok, _RoutingRes, _DeliveredQPids} =
- rabbit_basic:publish(LogExch, RoutingKey, false, false,
+ rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
timestamp = Timestamp},
list_to_binary(io_lib:format(Format, Data))),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c11a8ff7..41389815 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -87,12 +87,11 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- MNodes1 =
- (case MNodes of
- all -> rabbit_mnesia:all_clustered_nodes();
- undefined -> [];
- _ -> MNodes
- end) -- [node()],
+ MNodes1 = (case MNodes of
+ all -> rabbit_mnesia:cluster_nodes(all);
+ undefined -> [];
+ _ -> MNodes
+ end) -- [node()],
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3e45f026..039b2749 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -70,7 +70,7 @@
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId}
+ sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
msg_id_ack, %% :: MsgId -> AckTag
ack_num,
@@ -167,27 +167,10 @@ init_it(Self, Node, QueueName) ->
end
end.
-handle_call({deliver, Delivery = #delivery { immediate = true }},
- From, State) ->
- %% It is safe to reply 'false' here even if a) we've not seen the
- %% msg via gm, or b) the master dies before we receive the msg via
- %% gm. In the case of (a), we will eventually receive the msg via
- %% gm, and it's only the master's result to the channel that is
- %% important. In the case of (b), if the master does die and we do
- %% get promoted then at that point we have no consumers, thus
- %% 'false' is precisely the correct answer. However, we must be
- %% careful to _not_ enqueue the message in this case.
-
- %% Note this is distinct from the case where we receive the msg
- %% via gm first, then we're promoted to master, and only then do
- %% we receive the msg from the channel.
- gen_server2:reply(From, false), %% master may deliver it, not us
- noreply(maybe_enqueue_message(Delivery, false, State));
-
-handle_call({deliver, Delivery = #delivery { mandatory = true }},
- From, State) ->
- gen_server2:reply(From, true), %% amqqueue throws away the result anyway
- noreply(maybe_enqueue_message(Delivery, true, State));
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" deliver mode.
+ gen_server2:reply(From, ok),
+ noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
@@ -232,12 +215,12 @@ handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
- %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ %% Asynchronous, non-"mandatory", deliver mode.
case Flow of
flow -> credit_flow:ack(Sender);
noflow -> ok
end,
- noreply(maybe_enqueue_message(Delivery, true, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -554,7 +537,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
- {Delivery, true} <- queue:to_list(PubQ)],
+ Delivery <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
AckTags, Deliveries, KS, MTC),
@@ -655,14 +638,13 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
msg_seq_no = MsgSeqNo,
sender = ChPid },
- EnqueueOnPromotion,
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ),
+ MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, {confirmed, ChPid}} ->
@@ -732,10 +714,9 @@ process_instruction(
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, {published, ChPid}, MS)};
- {{value, {Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
+ {{value, Delivery = #delivery {
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
@@ -747,7 +728,7 @@ process_instruction(
ChPid, [MsgSeqNo]),
MS
end};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ {{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
%% record. We'll never receive the message directly
@@ -784,12 +765,12 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{empty, _MQ} ->
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, discarded, MS)};
- {{value, {#delivery { message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
+ {{value, #delivery { message = #basic_message { id = MsgId } }},
+ MQ2} ->
%% We've already seen it from the channel, we're not
%% going to see this again, so don't add it to MS
{MQ2, PendingCh, MS};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ {{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
%% record. We'll never receive the message directly
@@ -819,27 +800,23 @@ process_instruction({drop, Length, Dropped, AckRequired},
end, State, lists:duplicate(ToDrop, const)),
{ok, case AckRequired of
true -> State1;
- false -> set_synchronised(ToDrop - Dropped, State1)
+ false -> update_delta(ToDrop - Dropped, State1)
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {State1, Delta} =
- case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- {maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 }),
- 0};
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 });
+ _ when QLen =< Remaining andalso AckRequired ->
+ State;
_ when QLen =< Remaining ->
- {State, case AckRequired of
- true -> 0;
- false -> -1
- end}
- end,
- {ok, set_synchronised(Delta, State1)};
+ update_delta(-1, State)
+ end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -847,9 +824,9 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, set_synchronised(length(MsgIds1) - length(MsgIds),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 })};
+ {ok, update_delta(length(MsgIds1) - length(MsgIds),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -878,8 +855,8 @@ process_instruction({sender_death, ChPid},
process_instruction({depth, Depth},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- {ok, set_synchronised(
- 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })};
+ {ok, set_delta(Depth - BQ:depth(BQS), State)};
+
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -905,38 +882,34 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-set_synchronised(Delta, State) ->
- set_synchronised(Delta, false, State).
+set_delta(0, State = #state { depth_delta = undefined }) ->
+ ok = record_synchronised(State#state.q),
+ State #state { depth_delta = 0 };
+set_delta(NewDelta, State = #state { depth_delta = undefined }) ->
+ true = NewDelta > 0, %% assertion
+ State #state { depth_delta = NewDelta };
+set_delta(NewDelta, State = #state { depth_delta = Delta }) ->
+ update_delta(NewDelta - Delta, State).
-set_synchronised(_Delta, _AddAnyway,
- State = #state { depth_delta = undefined }) ->
+update_delta(_DeltaChange, State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(Delta, AddAnyway,
- State = #state { depth_delta = DepthDelta,
- q = #amqqueue { name = QName }}) ->
- DepthDelta1 = DepthDelta + Delta,
- %% We intentionally leave out the head where a slave becomes
- %% unsynchronised: we assert that can never happen.
- %% The `AddAnyway' param is there since in the `depth' instruction we
- %% receive the master depth for the first time, and we want to set the sync
- %% state anyway if we are synced.
- case DepthDelta1 =:= 0 of
- true when not (DepthDelta =:= 0) orelse AddAnyway ->
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
- %% We might be there already, in the `AddAnyway'
- %% case
- SSPids1 = SSPids -- [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
- end
- end);
- _ when DepthDelta1 >= 0 ->
- ok
- end,
- State #state { depth_delta = DepthDelta1 }.
+update_delta( DeltaChange, State = #state { depth_delta = 0 }) ->
+ 0 = DeltaChange, %% assertion: we cannot become unsync'ed
+ State;
+update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
+ true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
+ set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
+
+record_synchronised(#amqqueue { name = QName }) ->
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q = #amqqueue { sync_slave_pids = SSPids }] ->
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { sync_slave_pids = [Self | SSPids] }),
+ ok
+ end
+ end).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index f7a355be..bfecf06a 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -263,20 +263,16 @@ forget_cluster_node(Node, RemoveWhenOffline) ->
true -> ok;
false -> e(not_a_cluster_node)
end,
- case mnesia:system_info(is_running) of
- no when RemoveWhenOffline ->
- remove_node_offline_node(Node);
- yes when RemoveWhenOffline ->
- e(online_node_offline_flag);
- no ->
- e(offline_node_no_offline_flag);
- yes ->
- rabbit_misc:local_info_msg("Removing node ~p from cluster~n",
- [Node]),
- case remove_node_if_mnesia_running(Node) of
- ok -> ok;
- {error, _} = Err -> throw(Err)
- end
+ case {RemoveWhenOffline, mnesia:system_info(is_running)} of
+ {true, no} -> remove_node_offline_node(Node);
+ {true, yes} -> e(online_node_offline_flag);
+ {false, no} -> e(offline_node_no_offline_flag);
+ {false, yes} -> rabbit_misc:local_info_msg(
+ "Removing node ~p from cluster~n", [Node]),
+ case remove_node_if_mnesia_running(Node) of
+ ok -> ok;
+ {error, _} = Err -> throw(Err)
+ end
end.
remove_node_offline_node(Node) ->
@@ -324,6 +320,8 @@ status() ->
is_clustered() -> AllNodes = cluster_nodes(all),
AllNodes =/= [] andalso AllNodes =/= [node()].
+cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
+
%% This function is the actual source of information, since it gets
%% the data from mnesia. Obviously it'll work only when mnesia is
%% running.
@@ -352,7 +350,7 @@ mnesia_nodes() ->
end
end.
-cluster_nodes(WhichNodes) ->
+cluster_status(WhichNodes) ->
%% I don't want to call `running_nodes/1' unless if necessary, since it's
%% pretty expensive.
{AllNodes1, DiscNodes1, RunningNodesThunk} =
@@ -556,9 +554,9 @@ check_cluster_consistency(Node) ->
%%--------------------------------------------------------------------
on_node_up(Node) ->
- case running_disc_nodes() =:= [Node] of
- true -> rabbit_log:info("cluster contains disc nodes again~n");
- false -> ok
+ case running_disc_nodes() of
+ [Node] -> rabbit_log:info("cluster contains disc nodes again~n");
+ _ -> ok
end.
on_node_down(_Node) ->
@@ -568,7 +566,7 @@ on_node_down(_Node) ->
end.
running_disc_nodes() ->
- {_AllNodes, DiscNodes, RunningNodes} = cluster_nodes(status),
+ {_AllNodes, DiscNodes, RunningNodes} = cluster_status(status),
ordsets:to_list(ordsets:intersection(ordsets:from_list(DiscNodes),
ordsets:from_list(RunningNodes))).
@@ -583,18 +581,16 @@ discover_cluster(Nodes) when is_list(Nodes) ->
discover_cluster(Node) ->
OfflineError =
{error, {cannot_discover_cluster,
- "The nodes provided is either offline or not running"}},
+ "The nodes provided are either offline or not running"}},
case node() of
- Node->
- {error, {cannot_discover_cluster,
- "You provided the current node as node to cluster with"}};
- _ ->
- case rpc:call(Node,
- rabbit_mnesia, cluster_status_from_mnesia, []) of
- {badrpc, _Reason} -> OfflineError;
- {error, mnesia_not_running} -> OfflineError;
- {ok, Res} -> {ok, Res}
- end
+ Node -> {error, {cannot_discover_cluster,
+ "Cannot cluster node with itself"}};
+ _ -> case rpc:call(Node,
+ rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> OfflineError;
+ {error, mnesia_not_running} -> OfflineError;
+ {ok, Res} -> {ok, Res}
+ end
end.
schema_ok_or_move() ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index c1572762..026aa362 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -18,29 +18,16 @@
-behaviour(gen_server).
+-export([start_link/0]).
-export([running_nodes_filename/0,
- cluster_status_filename/0,
- prepare_cluster_status_files/0,
- write_cluster_status/1,
- read_cluster_status/0,
- update_cluster_status/0,
- reset_cluster_status/0,
-
- joined_cluster/2,
- notify_joined_cluster/0,
- left_cluster/1,
- notify_left_cluster/1,
- node_up/2,
- notify_node_up/0,
-
- start_link/0,
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3
- ]).
+ cluster_status_filename/0, prepare_cluster_status_files/0,
+ write_cluster_status/1, read_cluster_status/0,
+ update_cluster_status/0, reset_cluster_status/0]).
+-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -49,6 +36,8 @@
-ifdef(use_specs).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+
-spec(running_nodes_filename/0 :: () -> string()).
-spec(cluster_status_filename/0 :: () -> string()).
-spec(prepare_cluster_status_files/0 :: () -> 'ok').
@@ -57,27 +46,31 @@
-spec(update_cluster_status/0 :: () -> 'ok').
-spec(reset_cluster_status/0 :: () -> 'ok').
--spec(joined_cluster/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_node_up/0 :: () -> 'ok').
-spec(notify_joined_cluster/0 :: () -> 'ok').
--spec(left_cluster/1 :: (node()) -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(node_up/2 :: (node(), boolean()) -> 'ok').
--spec(notify_node_up/0 :: () -> 'ok').
-endif.
%%----------------------------------------------------------------------------
+%% Start
+%%----------------------------------------------------------------------------
+
+start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%----------------------------------------------------------------------------
%% Cluster file operations
%%----------------------------------------------------------------------------
-%% The cluster file information is kept in two files. The "cluster status file"
-%% contains all the clustered nodes and the disc nodes. The "running nodes
-%% file" contains the currently running nodes or the running nodes at shutdown
-%% when the node is down.
+%% The cluster file information is kept in two files. The "cluster
+%% status file" contains all the clustered nodes and the disc nodes.
+%% The "running nodes file" contains the currently running nodes or
+%% the running nodes at shutdown when the node is down.
%%
-%% We strive to keep the files up to date and we rely on this assumption in
-%% various situations. Obviously when mnesia is offline the information we have
-%% will be outdated, but it can't be otherwise.
+%% We strive to keep the files up to date and we rely on this
+%% assumption in various situations. Obviously when mnesia is offline
+%% the information we have will be outdated, but it cannot be
+%% otherwise.
running_nodes_filename() ->
filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
@@ -94,8 +87,8 @@ prepare_cluster_status_files() ->
{error, enoent} -> []
end,
ThisNode = [node()],
- %% The running nodes file might contain a set or a list, in case of the
- %% legacy file
+ %% The running nodes file might contain a set or a list, in case
+ %% of the legacy file
RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1),
{AllNodes1, WantDiscNode} =
case try_read_file(cluster_status_filename()) of
@@ -131,13 +124,6 @@ write_cluster_status({All, Disc, Running}) ->
{FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
end.
-try_read_file(FileName) ->
- case rabbit_file:read_term_file(FileName) of
- {ok, Term} -> {ok, Term};
- {error, enoent} -> {error, enoent};
- {error, E} -> throw({error, {cannot_read_file, FileName, E}})
- end.
-
read_cluster_status() ->
case {try_read_file(cluster_status_filename()),
try_read_file(running_nodes_filename())} of
@@ -158,49 +144,44 @@ reset_cluster_status() ->
%% Cluster notifications
%%----------------------------------------------------------------------------
-joined_cluster(Node, IsDiscNode) ->
- gen_server:cast(?SERVER, {rabbit_join, Node, IsDiscNode}).
+notify_node_up() ->
+ Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ gen_server:abcast(Nodes, ?SERVER,
+ {node_up, node(), rabbit_mnesia:node_type()}),
+ %% register other active rabbits with this rabbit
+ DiskNodes = rabbit_mnesia:cluster_nodes(disc),
+ [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
+ true -> disc;
+ false -> ram
+ end}) || N <- Nodes],
+ ok.
notify_joined_cluster() ->
- cluster_multicall(joined_cluster, [node(), rabbit_mnesia:node_type()]),
+ Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ gen_server:abcast(Nodes, ?SERVER,
+ {joined_cluster, node(), rabbit_mnesia:node_type()}),
ok.
-left_cluster(Node) ->
- gen_server:cast(?SERVER, {left_cluster, Node}).
-
notify_left_cluster(Node) ->
- left_cluster(Node),
- cluster_multicall(left_cluster, [Node]),
- ok.
-
-node_up(Node, IsDiscNode) ->
- gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}).
-
-notify_node_up() ->
- Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:node_type()]),
- %% register other active rabbits with this rabbit
- [ node_up(N, lists:member(N, rabbit_mnesia:cluster_nodes(disc))) ||
- N <- Nodes ],
+ Nodes = rabbit_mnesia:cluster_nodes(running),
+ gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
ok.
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-init([]) ->
- {ok, no_state}.
+init([]) -> {ok, pmon:new()}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-%% Note: when updating the status file, we can't simply write the mnesia
-%% information since the message can (and will) overtake the mnesia propagation.
-handle_cast({node_up, Node, NodeType}, State) ->
- case is_already_monitored({rabbit, Node}) of
- true -> {noreply, State};
+%% Note: when updating the status file, we can't simply write the
+%% mnesia information since the message can (and will) overtake the
+%% mnesia propagation.
+handle_cast({node_up, Node, NodeType}, Monitors) ->
+ case pmon:is_monitored({rabbit, Node}, Monitors) of
+ true -> {noreply, Monitors};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({add_node(Node, AllNodes),
@@ -209,9 +190,8 @@ handle_cast({node_up, Node, NodeType}, State) ->
ram -> DiscNodes
end,
add_node(Node, RunningNodes)}),
- erlang:monitor(process, {rabbit, Node}),
ok = handle_live_rabbit(Node),
- {noreply, State}
+ {noreply, pmon:monitor({rabbit, Node}, Monitors)}
end;
handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
@@ -230,12 +210,12 @@ handle_cast({left_cluster, Node}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Monitors) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, pmon:erase({rabbit, Node}, Monitors)};
handle_info(_Info, State) ->
{noreply, State}.
@@ -266,33 +246,22 @@ handle_live_rabbit(Node) ->
%% Internal utils
%%--------------------------------------------------------------------
-cluster_multicall(Fun, Args) ->
- Node = node(),
- Nodes = rabbit_mnesia:cluster_nodes(running) -- [Node],
- %% notify other rabbits of this cluster
- case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args,
- ?RABBIT_UP_RPC_TIMEOUT) of
- {_, [] } -> ok;
- {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
- end,
- Nodes.
-
-is_already_monitored(Item) ->
- {monitors, Monitors} = process_info(self(), monitors),
- lists:any(fun ({_, Item1}) when Item =:= Item1 -> true;
- (_) -> false
- end, Monitors).
+try_read_file(FileName) ->
+ case rabbit_file:read_term_file(FileName) of
+ {ok, Term} -> {ok, Term};
+ {error, enoent} -> {error, enoent};
+ {error, E} -> throw({error, {cannot_read_file, FileName, E}})
+ end.
legacy_cluster_nodes(Nodes) ->
- %% We get all the info that we can, including the nodes from mnesia, which
- %% will be there if the node is a disc node (empty list otherwise)
+ %% We get all the info that we can, including the nodes from
+ %% mnesia, which will be there if the node is a disc node (empty
+ %% list otherwise)
lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
legacy_should_be_disc_node(DiscNodes) ->
DiscNodes == [] orelse lists:member(node(), DiscNodes).
-add_node(Node, Nodes) ->
- lists:usort([Node|Nodes]).
+add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
-del_node(Node, Nodes) ->
- Nodes -- [Node].
+del_node(Node, Nodes) -> Nodes -- [Node].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 85172461..df0ee721 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -656,7 +656,6 @@ test_topic_expect_match(X, List) ->
#'P_basic'{}, <<>>),
Res = rabbit_exchange_type_topic:route(
X, #delivery{mandatory = false,
- immediate = false,
sender = self(),
message = Message}),
ExpectedRes = lists:map(
@@ -2194,8 +2193,8 @@ publish_and_confirm(Q, Payload, Count) ->
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{delivery_mode = 2},
Payload),
- Delivery = #delivery{mandatory = false, immediate = false,
- sender = self(), message = Msg, msg_seq_no = Seq},
+ Delivery = #delivery{mandatory = false, sender = self(),
+ message = Msg, msg_seq_no = Seq},
{routed, _} = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 8966bcab..f488afb4 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -69,7 +69,6 @@
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: boolean(),
- immediate :: boolean(),
sender :: pid(),
message :: message()}).
-type(message_properties() ::