diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-09-26 14:06:10 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-09-26 14:06:10 +0100 |
commit | b5e92f44d67620e008b8eaabaeadc922e76a763e (patch) | |
tree | 788103ba155053ffcf141406492a64585d2f376c | |
parent | c372383769afd21d27bf9be00fd521d83bf664bb (diff) | |
parent | e6c877d3cdb6ca369d4d9c94af6187794affaac0 (diff) | |
download | rabbitmq-server-bug25179.tar.gz |
merge default into bug25179bug25179
-rw-r--r-- | README | 2 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/app_utils.erl | 41 | ||||
-rw-r--r-- | src/pmon.erl | 32 | ||||
-rw-r--r-- | src/rabbit.erl | 42 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 42 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 32 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 8 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 23 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 2 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 15 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 149 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 56 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 161 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 5 | ||||
-rw-r--r-- | src/rabbit_types.erl | 1 |
19 files changed, 299 insertions, 355 deletions
@@ -1 +1 @@ -Please see http://www.rabbitmq.com/build-server.html for build instructions. +Please see http://www.rabbitmq.com/build-server.html for build instructions.
\ No newline at end of file diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d6fac46d..fff92205 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -73,7 +73,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, sender, message, msg_seq_no}). +-record(delivery, {mandatory, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). diff --git a/src/app_utils.erl b/src/app_utils.erl index 4bef83a5..fdf6ed41 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -15,17 +15,21 @@ %% -module(app_utils). --export([load_applications/1, start_applications/1, - stop_applications/1, app_dependency_order/2, +-export([load_applications/1, start_applications/1, start_applications/2, + stop_applications/1, stop_applications/2, app_dependency_order/2, wait_for_applications/1]). -ifdef(use_specs). --spec load_applications([atom()]) -> 'ok'. --spec start_applications([atom()]) -> 'ok'. --spec stop_applications([atom()]) -> 'ok'. --spec wait_for_applications([atom()]) -> 'ok'. --spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()]. +-type error_handler() :: fun((atom(), any()) -> 'ok'). + +-spec load_applications([atom()]) -> 'ok'. +-spec start_applications([atom()]) -> 'ok'. +-spec stop_applications([atom()]) -> 'ok'. +-spec start_applications([atom()], error_handler()) -> 'ok'. +-spec stop_applications([atom()], error_handler()) -> 'ok'. +-spec wait_for_applications([atom()]) -> 'ok'. +-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()]. -endif. @@ -37,21 +41,34 @@ load_applications(Apps) -> ok. start_applications(Apps) -> + start_applications( + Apps, fun (App, Reason) -> + throw({error, {cannot_start_application, App, Reason}}) + end). + +stop_applications(Apps) -> + stop_applications( + Apps, fun (App, Reason) -> + throw({error, {cannot_stop_application, App, Reason}}) + end). + +start_applications(Apps, ErrorHandler) -> manage_applications(fun lists:foldl/3, fun application:start/1, fun application:stop/1, already_started, - cannot_start_application, + ErrorHandler, Apps). -stop_applications(Apps) -> +stop_applications(Apps, ErrorHandler) -> manage_applications(fun lists:foldr/3, fun application:stop/1, fun application:start/1, not_started, - cannot_stop_application, + ErrorHandler, Apps). + wait_for_applications(Apps) -> [wait_for_application(App) || App <- Apps], ok. @@ -107,14 +124,14 @@ app_dependencies(App) -> {ok, Lst} -> Lst end. -manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> +manage_applications(Iterate, Do, Undo, SkipError, ErrorHandler, Apps) -> Iterate(fun (App, Acc) -> case Do(App) of ok -> [App | Acc]; {error, {SkipError, _}} -> Acc; {error, Reason} -> lists:foreach(Undo, Acc), - throw({error, {ErrorTag, App, Reason}}) + ErrorHandler(App, Reason) end end, [], Apps), ok. diff --git a/src/pmon.erl b/src/pmon.erl index 45786577..1aeebb72 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -27,37 +27,39 @@ -opaque(?MODULE() :: dict()). +-type(item() :: pid() | {atom(), node()}). + -spec(new/0 :: () -> ?MODULE()). --spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). --spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()). --spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). --spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()). --spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()). --spec(monitored/1 :: (?MODULE()) -> [pid()]). +-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()). +-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). +-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). +-spec(is_monitored/2 :: (item(), ?MODULE()) -> boolean()). +-spec(erase/2 :: (item(), ?MODULE()) -> ?MODULE()). +-spec(monitored/1 :: (?MODULE()) -> [item()]). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -endif. new() -> dict:new(). -monitor(Pid, M) -> - case dict:is_key(Pid, M) of +monitor(Item, M) -> + case dict:is_key(Item, M) of true -> M; - false -> dict:store(Pid, erlang:monitor(process, Pid), M) + false -> dict:store(Item, erlang:monitor(process, Item), M) end. -monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids). +monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). -demonitor(Pid, M) -> - case dict:find(Pid, M) of +demonitor(Item, M) -> + case dict:find(Item, M) of {ok, MRef} -> erlang:demonitor(MRef), - dict:erase(Pid, M); + dict:erase(Item, M); error -> M end. -is_monitored(Pid, M) -> dict:is_key(Pid, M). +is_monitored(Item, M) -> dict:is_key(Item, M). -erase(Pid, M) -> dict:erase(Pid, M). +erase(Item, M) -> dict:erase(Item, M). monitored(M) -> dict:fetch_keys(M). diff --git a/src/rabbit.erl b/src/rabbit.erl index afa97ddc..7b417b00 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -303,7 +303,8 @@ start() -> ok = rabbit_node_monitor:prepare_cluster_status_files(), ok = rabbit_mnesia:check_cluster_consistency(), ok = ensure_working_log_handlers(), - ok = app_utils:start_applications(app_startup_order()), + ok = app_utils:start_applications( + app_startup_order(), fun handle_app_error/2), ok = print_plugin_info(rabbit_plugins:active()) end). @@ -323,10 +324,17 @@ boot() -> ok = app_utils:load_applications(ToBeLoaded), StartupApps = app_utils:app_dependency_order(ToBeLoaded, false), - ok = app_utils:start_applications(StartupApps), + ok = app_utils:start_applications( + StartupApps, fun handle_app_error/2), ok = print_plugin_info(Plugins) end). +handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> + boot_error({could_not_start, App, Reason}, not_available); + +handle_app_error(App, Reason) -> + boot_error({could_not_start, App, Reason}, not_available). + start_it(StartFun) -> try StartFun() @@ -450,7 +458,7 @@ run_boot_step({StepName, Attributes}) -> [try apply(M,F,A) catch - _:Reason -> boot_step_error(Reason, erlang:get_stacktrace()) + _:Reason -> boot_error(Reason, erlang:get_stacktrace()) end || {M,F,A} <- MFAs], io:format("done~n"), ok @@ -489,14 +497,14 @@ sort_boot_steps(UnsortedSteps) -> {mfa, {M,F,A}} <- Attributes, not erlang:function_exported(M, F, length(A))] of [] -> SortedSteps; - MissingFunctions -> boot_error( + MissingFunctions -> basic_boot_error( "Boot step functions not exported: ~p~n", [MissingFunctions]) end; {error, {vertex, duplicate, StepName}} -> - boot_error("Duplicate boot step name: ~w~n", [StepName]); + basic_boot_error("Duplicate boot step name: ~w~n", [StepName]); {error, {edge, Reason, From, To}} -> - boot_error( + basic_boot_error( "Could not add boot step dependency of ~w on ~w:~n~s", [To, From, case Reason of @@ -510,7 +518,7 @@ sort_boot_steps(UnsortedSteps) -> end]) end. -boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> +boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> AllNodes = rabbit_mnesia:cluster_nodes(all), {Err, Nodes} = case AllNodes -- [node()] of @@ -521,15 +529,19 @@ boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> "Timeout contacting cluster nodes: ~p.~n", [Ns]), Ns} end, - boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []); - -boot_step_error(Reason, Stacktrace) -> - boot_error("Error description:~n ~p~n~n" - "Log files (may contain more information):~n ~s~n ~s~n~n" - "Stack trace:~n ~p~n~n", - [Reason, log_location(kernel), log_location(sasl), Stacktrace]). + basic_boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []); + +boot_error(Reason, Stacktrace) -> + Fmt = "Error description:~n ~p~n~n" + "Log files (may contain more information):~n ~s~n ~s~n~n", + Args = [Reason, log_location(kernel), log_location(sasl)], + case Stacktrace of + not_available -> basic_boot_error(Fmt, Args); + _ -> basic_boot_error(Fmt ++ "Stack trace:~n ~p~n~n", + Args ++ [Stacktrace]) + end. -boot_error(Format, Args) -> +basic_boot_error(Format, Args) -> io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), error_logger:error_msg(Format, Args), timer:sleep(1000), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b2473f91..4a20a1bc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -60,7 +60,7 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). +-type(routing_result() :: 'routed' | 'unroutable'). -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). -spec(start/0 :: () -> [name()]). @@ -650,18 +650,17 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. -deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> +deliver([], #delivery{mandatory = false}, _Flow) -> %% /dev/null optimisation {routed, []}; -deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. +deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> + %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver + %% will deliver the message to the queue process asynchronously, + %% and return true, which means all the QPids will always be + %% returned. It is therefore safe to use a fire-and-forget cast + %% here and return the QPids - the semantics is preserved. This + %% scales much better than the case below. QPids = qpids(Qs), case Flow of flow -> [credit_flow:send(QPid) || QPid <- QPids]; @@ -673,21 +672,14 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> end), {routed, QPids}; -deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}, - _Flow) -> - QPids = qpids(Qs), - {Success, _} = - delegate:invoke( - QPids, fun (QPid) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity) - end), - case {Mandatory, Immediate, - lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; - ({_, false}, {_, H}) -> {true, H} - end, {false, []}, Success)} of - {true, _ , {false, []}} -> {unroutable, []}; - {_ , true, {_ , []}} -> {not_delivered, []}; - {_ , _ , {_ , R}} -> {routed, R} +deliver(Qs, Delivery, _Flow) -> + case delegate:invoke( + qpids(Qs), fun (QPid) -> + ok = gen_server2:call(QPid, {deliver, Delivery}, + infinity) + end) of + {[], _} -> {unroutable, []}; + {R , _} -> {routed, [QPid || {QPid, ok} <- R]} end. qpids(Qs) -> lists:append([[QPid | SPids] || diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e647627c..0e3f0bac 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -767,7 +767,7 @@ dead_letter_fun(Reason, _State) -> dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) -> DLMsg = make_dead_letter_msg(Reason, Msg, State), - Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), {Queues, Cycles} = detect_dead_letter_cycles( DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), @@ -1029,27 +1029,9 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> - %% FIXME: Is this correct semantics? - %% - %% I'm worried in particular about the case where an exchange has - %% two queues against a particular routing key, and a message is - %% sent in immediate mode through the binding. In non-immediate - %% mode, both queues get the message, saving it for later if - %% there's noone ready to receive it just now. In immediate mode, - %% should both queues still get the message, somehow, or should - %% just all ready-to-consume queues get the message, with unready - %% queues discarding the message? - %% - Confirm = should_confirm_message(Delivery, State), - {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), - reply(Delivered, case Delivered of - true -> maybe_record_confirm_message(Confirm, State1); - false -> discard_delivery(Delivery, State1) - end); - -handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> - gen_server2:reply(From, true), +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" deliver mode. + gen_server2:reply(From, ok), noreply(deliver_or_enqueue(Delivery, State)); handle_call({notify_down, ChPid}, From, State) -> @@ -1195,7 +1177,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State = #q{senders = Senders}) -> - %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + %% Asynchronous, non-"mandatory" deliver mode. Senders1 = case Flow of flow -> credit_flow:ack(Sender), pmon:monitor(Sender, Senders); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 734456d3..db2b7e95 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,9 +18,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/4, publish/6, publish/1, +-export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, append_table_header/3, - extract_headers/1, map_headers/2, delivery/4, header_routes/1]). + extract_headers/1, map_headers/2, delivery/3, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -40,13 +40,13 @@ -spec(publish/4 :: (exchange_input(), rabbit_router:routing_key(), properties_input(), body_input()) -> publish_result()). --spec(publish/6 :: - (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), +-spec(publish/5 :: + (exchange_input(), rabbit_router:routing_key(), boolean(), properties_input(), body_input()) -> publish_result()). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: - (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> +-spec(delivery/3 :: + (boolean(), rabbit_types:message(), undefined | integer()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), @@ -80,18 +80,16 @@ %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. publish(Exchange, RoutingKeyBin, Properties, Body) -> - publish(Exchange, RoutingKeyBin, false, false, Properties, Body). + publish(Exchange, RoutingKeyBin, false, Properties, Body). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> - publish(X, delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)); -publish(XName, RKey, Mandatory, Immediate, Props, Body) -> - publish(delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)). +publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(X, delivery(Mandatory, Message, undefined)); +publish(XName, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(delivery(Mandatory, Message, undefined)). publish(Delivery = #delivery{ message = #basic_message{exchange_name = XName}}) -> @@ -105,8 +103,8 @@ publish(X, Delivery) -> {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), {ok, RoutingRes, DeliveredQPids}. -delivery(Mandatory, Immediate, Message, MsgSeqNo) -> - #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(), +delivery(Mandatory, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 2e462354..0d23f716 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -169,9 +169,9 @@ add(Binding, InnerFun) -> add(Src, Dst, B) -> [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], - case (not (SrcDurable andalso DstDurable) orelse - mnesia:read({rabbit_durable_route, B}) =:= []) of - true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, + case (SrcDurable andalso DstDurable andalso + mnesia:read({rabbit_durable_route, B}) =/= []) of + false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, fun mnesia:write/3), x_callback(transaction, Src, add_binding, B), Serial = rabbit_exchange:serial(Src), @@ -179,7 +179,7 @@ add(Src, Dst, B) -> x_callback(Serial, Src, add_binding, B), ok = rabbit_event:notify(binding_created, info(B)) end; - false -> rabbit_misc:const({error, binding_not_found}) + true -> rabbit_misc:const({error, binding_not_found}) end. remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e50e823c..0d13312b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -136,7 +136,7 @@ flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). list() -> - rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_channel, list_local, []). list_local() -> @@ -598,10 +598,12 @@ handle_method(_Method, _, #ch{tx_status = TxStatus}) handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; +handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> + rabbit_misc:protocol_error(not_implemented, "immediate=true", []); + handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, - mandatory = Mandatory, - immediate = Immediate}, + mandatory = Mandatory}, Content, State = #ch{virtual_host = VHostPath, tx_status = TxStatus, confirm_enabled = ConfirmEnabled, @@ -623,8 +625,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_trace_in(Message, TraceState), - Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message, - MsgSeqNo), + Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), {noreply, case TxStatus of @@ -1342,20 +1343,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QPid <- DeliveredQPids]], publish, State2), State2. -process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], return_unroutable, State), record_confirm(MsgSeqNo, XName, State); -process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State, no_consumers), - maybe_incr_stats([{XName, 1}], return_not_delivered, State), - record_confirm(MsgSeqNo, XName, State); -process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); -process_routing_result(routed, _, _, undefined, _, State) -> +process_routing_result(routed, _, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> +process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, State#ch.unconfirmed)}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index a6c4fe67..e75e1f6f 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -461,7 +461,7 @@ action(list_parameters, Node, [], Opts, Inform) -> action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || - N <- unsafe_rpc(Node, rabbit_mnesia, running_clustered_nodes, []), + N <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]), Action <- [status, cluster_status, environment]], VHosts = unsafe_rpc(Node, rabbit_vhost, list, []), [print_report(Node, Q) || Q <- ?GLOBAL_QUERIES], diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index a669a2b3..689e5d83 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -31,7 +31,8 @@ -spec(force_event_refresh/0 :: () -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). --spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user()), +-spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() | + {rabbit_types:username(), rabbit_types:password()}), rabbit_types:vhost(), rabbit_types:protocol(), pid(), rabbit_event:event_props()) -> {'ok', {rabbit_types:user(), @@ -59,7 +60,7 @@ list_local() -> pg_local:get_members(rabbit_direct). list() -> - rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_direct, list_local, []). %%---------------------------------------------------------------------------- @@ -74,10 +75,17 @@ connect(User = #user{}, VHost, Protocol, Pid, Infos) -> {error, access_refused} end; +connect({Username, Password}, VHost, Protocol, Pid, Infos) -> + connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid, + Infos); + connect(Username, VHost, Protocol, Pid, Infos) -> + connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos). + +connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of true -> - case rabbit_access_control:check_user_login(Username, []) of + case rabbit_access_control:FunctionName(U, P) of {ok, User} -> connect(User, VHost, Protocol, Pid, Infos); {refused, _M, _A} -> {error, auth_failure} end; @@ -85,6 +93,7 @@ connect(Username, VHost, Protocol, Pid, Infos) -> {error, broker_not_found_on_node} end. + start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector) -> {ok, _, {ChannelPid, _}} = diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index f1672f4e..a9af2d8a 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -81,7 +81,7 @@ publish1(RoutingKey, Format, Data, LogExch) -> %% second resolution, not millisecond. Timestamp = rabbit_misc:now_ms() div 1000, {ok, _RoutingRes, _DeliveredQPids} = - rabbit_basic:publish(LogExch, RoutingKey, false, false, + rabbit_basic:publish(LogExch, RoutingKey, #'P_basic'{content_type = <<"text/plain">>, timestamp = Timestamp}, list_to_binary(io_lib:format(Format, Data))), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c11a8ff7..41389815 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -87,12 +87,11 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - MNodes1 = - (case MNodes of - all -> rabbit_mnesia:all_clustered_nodes(); - undefined -> []; - _ -> MNodes - end) -- [node()], + MNodes1 = (case MNodes of + all -> rabbit_mnesia:cluster_nodes(all); + undefined -> []; + _ -> MNodes + end) -- [node()], [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3e45f026..039b2749 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -70,7 +70,7 @@ sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId} + sender_queues, %% :: Pid -> {Q Msg, Set MsgId} msg_id_ack, %% :: MsgId -> AckTag ack_num, @@ -167,27 +167,10 @@ init_it(Self, Node, QueueName) -> end end. -handle_call({deliver, Delivery = #delivery { immediate = true }}, - From, State) -> - %% It is safe to reply 'false' here even if a) we've not seen the - %% msg via gm, or b) the master dies before we receive the msg via - %% gm. In the case of (a), we will eventually receive the msg via - %% gm, and it's only the master's result to the channel that is - %% important. In the case of (b), if the master does die and we do - %% get promoted then at that point we have no consumers, thus - %% 'false' is precisely the correct answer. However, we must be - %% careful to _not_ enqueue the message in this case. - - %% Note this is distinct from the case where we receive the msg - %% via gm first, then we're promoted to master, and only then do - %% we receive the msg from the channel. - gen_server2:reply(From, false), %% master may deliver it, not us - noreply(maybe_enqueue_message(Delivery, false, State)); - -handle_call({deliver, Delivery = #delivery { mandatory = true }}, - From, State) -> - gen_server2:reply(From, true), %% amqqueue throws away the result anyway - noreply(maybe_enqueue_message(Delivery, true, State)); +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" deliver mode. + gen_server2:reply(From, ok), + noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, @@ -232,12 +215,12 @@ handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> - %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + %% Asynchronous, non-"mandatory", deliver mode. case Flow of flow -> credit_flow:ack(Sender); noflow -> ok end, - noreply(maybe_enqueue_message(Delivery, true, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -554,7 +537,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), - {Delivery, true} <- queue:to_list(PubQ)], + Delivery <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, Deliveries, KS, MTC), @@ -655,14 +638,13 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, msg_seq_no = MsgSeqNo, sender = ChPid }, - EnqueueOnPromotion, State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> {MQ, PendingCh} = get_sender_queue(ChPid, SQ), - MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ), + MQ1 = queue:in(Delivery, MQ), SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; {ok, {confirmed, ChPid}} -> @@ -732,10 +714,9 @@ process_instruction( {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), dict:store(MsgId, {published, ChPid}, MS)}; - {{value, {Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> + {{value, Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, %% We received the msg from the channel first. Thus %% we need to deal with confirms here. @@ -747,7 +728,7 @@ process_instruction( ChPid, [MsgSeqNo]), MS end}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} %% record. We'll never receive the message directly @@ -784,12 +765,12 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {empty, _MQ} -> {MQ, sets:add_element(MsgId, PendingCh), dict:store(MsgId, discarded, MS)}; - {{value, {#delivery { message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> + {{value, #delivery { message = #basic_message { id = MsgId } }}, + MQ2} -> %% We've already seen it from the channel, we're not %% going to see this again, so don't add it to MS {MQ2, PendingCh, MS}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} %% record. We'll never receive the message directly @@ -819,27 +800,23 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State, lists:duplicate(ToDrop, const)), {ok, case AckRequired of true -> State1; - false -> set_synchronised(ToDrop - Dropped, State1) + false -> update_delta(ToDrop - Dropped, State1) end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - {State1, Delta} = - case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - {maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }), - 0}; + {ok, case QLen - 1 of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }); + _ when QLen =< Remaining andalso AckRequired -> + State; _ when QLen =< Remaining -> - {State, case AckRequired of - true -> 0; - false -> -1 - end} - end, - {ok, set_synchronised(Delta, State1)}; + update_delta(-1, State) + end}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -847,9 +824,9 @@ process_instruction({ack, MsgIds}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, set_synchronised(length(MsgIds1) - length(MsgIds), - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 })}; + {ok, update_delta(length(MsgIds1) - length(MsgIds), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -878,8 +855,8 @@ process_instruction({sender_death, ChPid}, process_instruction({depth, Depth}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - {ok, set_synchronised( - 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })}; + {ok, set_delta(Depth - BQ:depth(BQS), State)}; + process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -905,38 +882,34 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -set_synchronised(Delta, State) -> - set_synchronised(Delta, false, State). +set_delta(0, State = #state { depth_delta = undefined }) -> + ok = record_synchronised(State#state.q), + State #state { depth_delta = 0 }; +set_delta(NewDelta, State = #state { depth_delta = undefined }) -> + true = NewDelta > 0, %% assertion + State #state { depth_delta = NewDelta }; +set_delta(NewDelta, State = #state { depth_delta = Delta }) -> + update_delta(NewDelta - Delta, State). -set_synchronised(_Delta, _AddAnyway, - State = #state { depth_delta = undefined }) -> +update_delta(_DeltaChange, State = #state { depth_delta = undefined }) -> State; -set_synchronised(Delta, AddAnyway, - State = #state { depth_delta = DepthDelta, - q = #amqqueue { name = QName }}) -> - DepthDelta1 = DepthDelta + Delta, - %% We intentionally leave out the head where a slave becomes - %% unsynchronised: we assert that can never happen. - %% The `AddAnyway' param is there since in the `depth' instruction we - %% receive the master depth for the first time, and we want to set the sync - %% state anyway if we are synced. - case DepthDelta1 =:= 0 of - true when not (DepthDelta =:= 0) orelse AddAnyway -> - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> - %% We might be there already, in the `AddAnyway' - %% case - SSPids1 = SSPids -- [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{sync_slave_pids = [Self | SSPids1]}) - end - end); - _ when DepthDelta1 >= 0 -> - ok - end, - State #state { depth_delta = DepthDelta1 }. +update_delta( DeltaChange, State = #state { depth_delta = 0 }) -> + 0 = DeltaChange, %% assertion: we cannot become unsync'ed + State; +update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> + true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed + set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). + +record_synchronised(#amqqueue { name = QName }) -> + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q = #amqqueue { sync_slave_pids = SSPids }] -> + rabbit_mirror_queue_misc:store_updated_slaves( + Q #amqqueue { sync_slave_pids = [Self | SSPids] }), + ok + end + end). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f7a355be..bfecf06a 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -263,20 +263,16 @@ forget_cluster_node(Node, RemoveWhenOffline) -> true -> ok; false -> e(not_a_cluster_node) end, - case mnesia:system_info(is_running) of - no when RemoveWhenOffline -> - remove_node_offline_node(Node); - yes when RemoveWhenOffline -> - e(online_node_offline_flag); - no -> - e(offline_node_no_offline_flag); - yes -> - rabbit_misc:local_info_msg("Removing node ~p from cluster~n", - [Node]), - case remove_node_if_mnesia_running(Node) of - ok -> ok; - {error, _} = Err -> throw(Err) - end + case {RemoveWhenOffline, mnesia:system_info(is_running)} of + {true, no} -> remove_node_offline_node(Node); + {true, yes} -> e(online_node_offline_flag); + {false, no} -> e(offline_node_no_offline_flag); + {false, yes} -> rabbit_misc:local_info_msg( + "Removing node ~p from cluster~n", [Node]), + case remove_node_if_mnesia_running(Node) of + ok -> ok; + {error, _} = Err -> throw(Err) + end end. remove_node_offline_node(Node) -> @@ -324,6 +320,8 @@ status() -> is_clustered() -> AllNodes = cluster_nodes(all), AllNodes =/= [] andalso AllNodes =/= [node()]. +cluster_nodes(WhichNodes) -> cluster_status(WhichNodes). + %% This function is the actual source of information, since it gets %% the data from mnesia. Obviously it'll work only when mnesia is %% running. @@ -352,7 +350,7 @@ mnesia_nodes() -> end end. -cluster_nodes(WhichNodes) -> +cluster_status(WhichNodes) -> %% I don't want to call `running_nodes/1' unless if necessary, since it's %% pretty expensive. {AllNodes1, DiscNodes1, RunningNodesThunk} = @@ -556,9 +554,9 @@ check_cluster_consistency(Node) -> %%-------------------------------------------------------------------- on_node_up(Node) -> - case running_disc_nodes() =:= [Node] of - true -> rabbit_log:info("cluster contains disc nodes again~n"); - false -> ok + case running_disc_nodes() of + [Node] -> rabbit_log:info("cluster contains disc nodes again~n"); + _ -> ok end. on_node_down(_Node) -> @@ -568,7 +566,7 @@ on_node_down(_Node) -> end. running_disc_nodes() -> - {_AllNodes, DiscNodes, RunningNodes} = cluster_nodes(status), + {_AllNodes, DiscNodes, RunningNodes} = cluster_status(status), ordsets:to_list(ordsets:intersection(ordsets:from_list(DiscNodes), ordsets:from_list(RunningNodes))). @@ -583,18 +581,16 @@ discover_cluster(Nodes) when is_list(Nodes) -> discover_cluster(Node) -> OfflineError = {error, {cannot_discover_cluster, - "The nodes provided is either offline or not running"}}, + "The nodes provided are either offline or not running"}}, case node() of - Node-> - {error, {cannot_discover_cluster, - "You provided the current node as node to cluster with"}}; - _ -> - case rpc:call(Node, - rabbit_mnesia, cluster_status_from_mnesia, []) of - {badrpc, _Reason} -> OfflineError; - {error, mnesia_not_running} -> OfflineError; - {ok, Res} -> {ok, Res} - end + Node -> {error, {cannot_discover_cluster, + "Cannot cluster node with itself"}}; + _ -> case rpc:call(Node, + rabbit_mnesia, cluster_status_from_mnesia, []) of + {badrpc, _Reason} -> OfflineError; + {error, mnesia_not_running} -> OfflineError; + {ok, Res} -> {ok, Res} + end end. schema_ok_or_move() -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index c1572762..026aa362 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -18,29 +18,16 @@ -behaviour(gen_server). +-export([start_link/0]). -export([running_nodes_filename/0, - cluster_status_filename/0, - prepare_cluster_status_files/0, - write_cluster_status/1, - read_cluster_status/0, - update_cluster_status/0, - reset_cluster_status/0, - - joined_cluster/2, - notify_joined_cluster/0, - left_cluster/1, - notify_left_cluster/1, - node_up/2, - notify_node_up/0, - - start_link/0, - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 - ]). + cluster_status_filename/0, prepare_cluster_status_files/0, + write_cluster_status/1, read_cluster_status/0, + update_cluster_status/0, reset_cluster_status/0]). +-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -49,6 +36,8 @@ -ifdef(use_specs). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). + -spec(running_nodes_filename/0 :: () -> string()). -spec(cluster_status_filename/0 :: () -> string()). -spec(prepare_cluster_status_files/0 :: () -> 'ok'). @@ -57,27 +46,31 @@ -spec(update_cluster_status/0 :: () -> 'ok'). -spec(reset_cluster_status/0 :: () -> 'ok'). --spec(joined_cluster/2 :: (node(), boolean()) -> 'ok'). +-spec(notify_node_up/0 :: () -> 'ok'). -spec(notify_joined_cluster/0 :: () -> 'ok'). --spec(left_cluster/1 :: (node()) -> 'ok'). -spec(notify_left_cluster/1 :: (node()) -> 'ok'). --spec(node_up/2 :: (node(), boolean()) -> 'ok'). --spec(notify_node_up/0 :: () -> 'ok'). -endif. %%---------------------------------------------------------------------------- +%% Start +%%---------------------------------------------------------------------------- + +start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%---------------------------------------------------------------------------- %% Cluster file operations %%---------------------------------------------------------------------------- -%% The cluster file information is kept in two files. The "cluster status file" -%% contains all the clustered nodes and the disc nodes. The "running nodes -%% file" contains the currently running nodes or the running nodes at shutdown -%% when the node is down. +%% The cluster file information is kept in two files. The "cluster +%% status file" contains all the clustered nodes and the disc nodes. +%% The "running nodes file" contains the currently running nodes or +%% the running nodes at shutdown when the node is down. %% -%% We strive to keep the files up to date and we rely on this assumption in -%% various situations. Obviously when mnesia is offline the information we have -%% will be outdated, but it can't be otherwise. +%% We strive to keep the files up to date and we rely on this +%% assumption in various situations. Obviously when mnesia is offline +%% the information we have will be outdated, but it cannot be +%% otherwise. running_nodes_filename() -> filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). @@ -94,8 +87,8 @@ prepare_cluster_status_files() -> {error, enoent} -> [] end, ThisNode = [node()], - %% The running nodes file might contain a set or a list, in case of the - %% legacy file + %% The running nodes file might contain a set or a list, in case + %% of the legacy file RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1), {AllNodes1, WantDiscNode} = case try_read_file(cluster_status_filename()) of @@ -131,13 +124,6 @@ write_cluster_status({All, Disc, Running}) -> {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}}) end. -try_read_file(FileName) -> - case rabbit_file:read_term_file(FileName) of - {ok, Term} -> {ok, Term}; - {error, enoent} -> {error, enoent}; - {error, E} -> throw({error, {cannot_read_file, FileName, E}}) - end. - read_cluster_status() -> case {try_read_file(cluster_status_filename()), try_read_file(running_nodes_filename())} of @@ -158,49 +144,44 @@ reset_cluster_status() -> %% Cluster notifications %%---------------------------------------------------------------------------- -joined_cluster(Node, IsDiscNode) -> - gen_server:cast(?SERVER, {rabbit_join, Node, IsDiscNode}). +notify_node_up() -> + Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], + gen_server:abcast(Nodes, ?SERVER, + {node_up, node(), rabbit_mnesia:node_type()}), + %% register other active rabbits with this rabbit + DiskNodes = rabbit_mnesia:cluster_nodes(disc), + [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of + true -> disc; + false -> ram + end}) || N <- Nodes], + ok. notify_joined_cluster() -> - cluster_multicall(joined_cluster, [node(), rabbit_mnesia:node_type()]), + Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], + gen_server:abcast(Nodes, ?SERVER, + {joined_cluster, node(), rabbit_mnesia:node_type()}), ok. -left_cluster(Node) -> - gen_server:cast(?SERVER, {left_cluster, Node}). - notify_left_cluster(Node) -> - left_cluster(Node), - cluster_multicall(left_cluster, [Node]), - ok. - -node_up(Node, IsDiscNode) -> - gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}). - -notify_node_up() -> - Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:node_type()]), - %% register other active rabbits with this rabbit - [ node_up(N, lists:member(N, rabbit_mnesia:cluster_nodes(disc))) || - N <- Nodes ], + Nodes = rabbit_mnesia:cluster_nodes(running), + gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}), ok. %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -init([]) -> - {ok, no_state}. +init([]) -> {ok, pmon:new()}. handle_call(_Request, _From, State) -> {noreply, State}. -%% Note: when updating the status file, we can't simply write the mnesia -%% information since the message can (and will) overtake the mnesia propagation. -handle_cast({node_up, Node, NodeType}, State) -> - case is_already_monitored({rabbit, Node}) of - true -> {noreply, State}; +%% Note: when updating the status file, we can't simply write the +%% mnesia information since the message can (and will) overtake the +%% mnesia propagation. +handle_cast({node_up, Node, NodeType}, Monitors) -> + case pmon:is_monitored({rabbit, Node}, Monitors) of + true -> {noreply, Monitors}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({add_node(Node, AllNodes), @@ -209,9 +190,8 @@ handle_cast({node_up, Node, NodeType}, State) -> ram -> DiscNodes end, add_node(Node, RunningNodes)}), - erlang:monitor(process, {rabbit, Node}), ok = handle_live_rabbit(Node), - {noreply, State} + {noreply, pmon:monitor({rabbit, Node}, Monitors)} end; handle_cast({joined_cluster, Node, NodeType}, State) -> {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), @@ -230,12 +210,12 @@ handle_cast({left_cluster, Node}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Monitors) -> rabbit_log:info("rabbit on node ~p down~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}), ok = handle_dead_rabbit(Node), - {noreply, State}; + {noreply, pmon:erase({rabbit, Node}, Monitors)}; handle_info(_Info, State) -> {noreply, State}. @@ -266,33 +246,22 @@ handle_live_rabbit(Node) -> %% Internal utils %%-------------------------------------------------------------------- -cluster_multicall(Fun, Args) -> - Node = node(), - Nodes = rabbit_mnesia:cluster_nodes(running) -- [Node], - %% notify other rabbits of this cluster - case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args, - ?RABBIT_UP_RPC_TIMEOUT) of - {_, [] } -> ok; - {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) - end, - Nodes. - -is_already_monitored(Item) -> - {monitors, Monitors} = process_info(self(), monitors), - lists:any(fun ({_, Item1}) when Item =:= Item1 -> true; - (_) -> false - end, Monitors). +try_read_file(FileName) -> + case rabbit_file:read_term_file(FileName) of + {ok, Term} -> {ok, Term}; + {error, enoent} -> {error, enoent}; + {error, E} -> throw({error, {cannot_read_file, FileName, E}}) + end. legacy_cluster_nodes(Nodes) -> - %% We get all the info that we can, including the nodes from mnesia, which - %% will be there if the node is a disc node (empty list otherwise) + %% We get all the info that we can, including the nodes from + %% mnesia, which will be there if the node is a disc node (empty + %% list otherwise) lists:usort(Nodes ++ mnesia:system_info(db_nodes)). legacy_should_be_disc_node(DiscNodes) -> DiscNodes == [] orelse lists:member(node(), DiscNodes). -add_node(Node, Nodes) -> - lists:usort([Node|Nodes]). +add_node(Node, Nodes) -> lists:usort([Node | Nodes]). -del_node(Node, Nodes) -> - Nodes -- [Node]. +del_node(Node, Nodes) -> Nodes -- [Node]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 85172461..df0ee721 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -656,7 +656,6 @@ test_topic_expect_match(X, List) -> #'P_basic'{}, <<>>), Res = rabbit_exchange_type_topic:route( X, #delivery{mandatory = false, - immediate = false, sender = self(), message = Message}), ExpectedRes = lists:map( @@ -2194,8 +2193,8 @@ publish_and_confirm(Q, Payload, Count) -> Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{delivery_mode = 2}, Payload), - Delivery = #delivery{mandatory = false, immediate = false, - sender = self(), message = Msg, msg_seq_no = Seq}, + Delivery = #delivery{mandatory = false, sender = self(), + message = Msg, msg_seq_no = Seq}, {routed, _} = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 8966bcab..f488afb4 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -69,7 +69,6 @@ -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), - immediate :: boolean(), sender :: pid(), message :: message()}). -type(message_properties() :: |