summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-21 15:26:23 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-21 15:26:23 +0100
commit9724e6f4f5ce034e892eadff1073e893e99b11c0 (patch)
tree38728cd2c6fee9fb7e19b306626311ac4042feaf
parentddf7763729950ad71d782818b3bb105f95e458e9 (diff)
parenta26dd141e76acda4f6d7f737d2f472cd21afe558 (diff)
downloadrabbitmq-server-9724e6f4f5ce034e892eadff1073e893e99b11c0.tar.gz
Merge bug23896
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmqctl.1.xml27
-rw-r--r--src/file_handle_cache.erl59
-rw-r--r--src/gm.erl33
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_auth_backend.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_binding.erl58
-rw-r--r--src/rabbit_direct.erl13
-rw-r--r--src/rabbit_exchange.erl7
-rw-r--r--src/rabbit_exchange_decorator.erl2
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_file.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_mnesia.erl500
16 files changed, 374 insertions, 356 deletions
diff --git a/Makefile b/Makefile
index f3729cfa..c63e3dfd 100644
--- a/Makefile
+++ b/Makefile
@@ -147,7 +147,7 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
dialyzer --plt $(BASIC_PLT) --no_native --fullpath \
- -Wrace_conditions $(BEAM_TARGETS)
+ $(BEAM_TARGETS)
# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target
create-plt: $(RABBIT_PLT)
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 1af93e85..11d85e9e 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -366,16 +366,8 @@
</para>
</listitem>
</varlistentry>
- </variablelist>
- <variablelist>
<varlistentry>
- <term>
- <cmdsynopsis>
- <command>change_cluster_node_type</command>
- <arg choice="req">
- disk | ram
- </arg>
- </cmdsynopsis>
+ <term><cmdsynopsis><command>change_cluster_node_type</command> <arg choice="req">disk | ram</arg></cmdsynopsis>
</term>
<listitem>
<para>
@@ -390,15 +382,8 @@
</para>
</listitem>
</varlistentry>
- </variablelist>
- <variablelist>
<varlistentry>
- <term>
- <cmdsynopsis>
- <command>forget_cluster_node</command>
- <arg choice="opt">--offline</arg>
- </cmdsynopsis>
- </term>
+ <term><cmdsynopsis><command>forget_cluster_node</command> <arg choice="opt">--offline</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -429,14 +414,8 @@
</para>
</listitem>
</varlistentry>
- </variablelist>
- <variablelist>
<varlistentry>
- <term>
- <cmdsynopsis>
- <command>update_cluster_nodes</command>
- <arg choice="req">clusternode</arg>
- </cmdsynopsis>
+ <term><cmdsynopsis><command>update_cluster_nodes</command> <arg choice="req">clusternode</arg></cmdsynopsis>
</term>
<listitem>
<variablelist>
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 68c095d2..3260d369 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -120,12 +120,12 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain, release and transfer. obtain/0
+%% The server also supports obtain, release and transfer. obtain/{0,1}
%% blocks until a file descriptor is available, at which point the
-%% requesting process is considered to 'own' one more
-%% descriptor. release/0 is the inverse operation and releases a
-%% previously obtained descriptor. transfer/1 transfers ownership of a
-%% file descriptor between processes. It is non-blocking. Obtain has a
+%% requesting process is considered to 'own' more descriptor(s).
+%% release/{0,1} is the inverse operation and releases previously obtained
+%% descriptor(s). transfer/{1,2} transfers ownership of file descriptor(s)
+%% between processes. It is non-blocking. Obtain has a
%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
%% the entire limit, but will be evicted by obtain calls up to the
%% point at which no more obtain calls can be satisfied by the obtains
@@ -136,8 +136,8 @@
%% as sockets can do so in such a way that the overall number of open
%% file descriptors is managed.
%%
-%% The callers of register_callback/3, obtain/0, and the argument of
-%% transfer/1 are monitored, reducing the count of handles in use
+%% The callers of register_callback/3, obtain, and the argument of
+%% transfer are monitored, reducing the count of handles in use
%% appropriately when the processes terminate.
-behaviour(gen_server2).
@@ -146,7 +146,8 @@
-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
+-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
+ set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
@@ -251,8 +252,11 @@
-spec(clear/1 :: (ref()) -> ok_or_error()).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
+-spec(obtain/1 :: (non_neg_integer()) -> 'ok').
-spec(release/0 :: () -> 'ok').
+-spec(release/1 :: (non_neg_integer()) -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
+-spec(transfer/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -485,18 +489,22 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-obtain() ->
+obtain() -> obtain(1).
+release() -> release(1).
+transfer(Pid) -> transfer(Pid, 1).
+
+obtain(Count) when Count > 0 ->
%% If the FHC isn't running, obtains succeed immediately.
case whereis(?SERVER) of
undefined -> ok;
- _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity)
+ _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity)
end.
-release() ->
- gen_server2:cast(?SERVER, {release, self()}).
+release(Count) when Count > 0 ->
+ gen_server2:cast(?SERVER, {release, Count, self()}).
-transfer(Pid) ->
- gen_server2:cast(?SERVER, {transfer, self(), Pid}).
+transfer(Pid, Count) when Count > 0 ->
+ gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}).
set_limit(Limit) ->
gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
@@ -842,7 +850,7 @@ init([AlarmSet, AlarmClear]) ->
prioritise_cast(Msg, _State) ->
case Msg of
- {release, _} -> 5;
+ {release, _, _} -> 5;
_ -> 0
end.
@@ -875,11 +883,12 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State)}
end;
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients }) ->
+handle_call({obtain, N, Pid}, From, State = #fhc_state {
+ obtain_count = Count,
+ obtain_pending = Pending,
+ clients = Clients }) ->
ok = track_client(Pid, Clients),
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ Item = #pending { kind = obtain, pid = Pid, requested = N, from = From },
Enqueue = fun () ->
true = ets:update_element(Clients, Pid,
{#cstate.blocked, true}),
@@ -890,7 +899,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
case obtain_limit_reached(State) of
true -> Enqueue();
false -> case needs_reduce(State #fhc_state {
- obtain_count = Count + 1 }) of
+ obtain_count = Count + N }) of
true -> reduce(Enqueue());
false -> adjust_alarm(
State, run_pending_item(Item, State))
@@ -925,9 +934,9 @@ handle_cast({update, Pid, EldestUnusedSince},
%% storm of messages
{noreply, State};
-handle_cast({release, Pid}, State) ->
+handle_cast({release, N, Pid}, State) ->
{noreply, adjust_alarm(State, process_pending(
- update_counts(obtain, Pid, -1, State)))};
+ update_counts(obtain, Pid, -N, State)))};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
@@ -939,11 +948,11 @@ handle_cast({close, Pid, EldestUnusedSince},
{noreply, adjust_alarm(State, process_pending(
update_counts(open, Pid, -1, State)))};
-handle_cast({transfer, FromPid, ToPid}, State) ->
+handle_cast({transfer, N, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
{noreply, process_pending(
- update_counts(obtain, ToPid, +1,
- update_counts(obtain, FromPid, -1, State)))}.
+ update_counts(obtain, ToPid, +N,
+ update_counts(obtain, FromPid, -N, State)))}.
handle_info(check_counts, State) ->
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
diff --git a/src/gm.erl b/src/gm.erl
index f88ed18f..90433e84 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -77,9 +77,13 @@
%% confirmed_broadcast/2 directly from the callback module otherwise
%% you will deadlock the entire group.
%%
-%% group_members/1
-%% Provide the Pid. Returns a list of the current group members.
+%% info/1
+%% Provide the Pid. Returns a proplist with various facts, including
+%% the group name and the current group members.
%%
+%% forget_group/1
+%% Provide the group name. Removes its mnesia record. Makes no attempt
+%% to ensure the group is empty.
%%
%% Implementation Overview
%% -----------------------
@@ -373,7 +377,7 @@
-behaviour(gen_server2).
-export([create_tables/0, start_link/3, leave/1, broadcast/2,
- confirmed_broadcast/2, group_members/1]).
+ confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/2]).
@@ -431,7 +435,8 @@
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
--spec(group_members/1 :: (pid()) -> [pid()]).
+-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(forget_group/1 :: (group_name()) -> 'ok').
%% The joined, members_changed and handle_msg callbacks can all return
%% any of the following terms:
@@ -514,9 +519,15 @@ broadcast(Server, Msg) ->
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
-group_members(Server) ->
- gen_server2:call(Server, group_members, infinity).
+info(Server) ->
+ gen_server2:call(Server, info, infinity).
+forget_group(GroupName) ->
+ {atomic, ok} = mnesia:sync_transaction(
+ fun () ->
+ mnesia:delete({?GROUP_TABLE, GroupName})
+ end),
+ ok.
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
@@ -553,12 +564,16 @@ handle_call({confirmed_broadcast, Msg}, _From,
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
-handle_call(group_members, _From,
+handle_call(info, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
-handle_call(group_members, _From, State = #state { view = View }) ->
- reply(get_pids(alive_view_members(View)), State);
+handle_call(info, _From, State = #state { group_name = GroupName,
+ module = Module,
+ view = View }) ->
+ reply([{group_name, GroupName},
+ {module, Module},
+ {group_members, get_pids(alive_view_members(View))}], State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f80559ba..4a20a1bc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -578,7 +578,12 @@ flush_all(QPids, ChPid) ->
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_queue, QueueName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
+ end,
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d6a5523a..0e3f0bac 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -560,18 +560,15 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- msg_seq_no = MsgSeqNo,
sender = SenderPid}, State) ->
Confirm = should_confirm_message(Delivery, State),
case attempt_delivery(Delivery, Confirm, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
- %% the next two are optimisations
+ %% the next one is an optimisations
+ %% TODO: optimise the Confirm =/= never case too
{false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
discard_delivery(Delivery, State1);
- {false, State1 = #q{ttl = 0, dlx = undefined}} ->
- rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- discard_delivery(Delivery, State1);
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index e89951e7..c9475efd 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -20,7 +20,7 @@
%% A description proplist as with auth mechanisms,
%% exchanges. Currently unused.
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% Check a user can log in, given a username and a proplist of
%% authentication information (e.g. [{password, Password}]).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index eda6a743..c7d74dc3 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -19,7 +19,7 @@
-ifdef(use_specs).
%% A description.
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% If this mechanism is enabled, should it be offered for a given socket?
%% (primarily so EXTERNAL can be SSL-only)
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index f0ea514d..2e462354 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -277,21 +277,15 @@ has_for_source(SrcName) ->
remove_for_source(SrcName) ->
lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
- Routes = lists:usort(
- mnesia:match_object(rabbit_route, Match, write) ++
- mnesia:match_object(rabbit_durable_route, Match, write)),
- [begin
- sync_route(Route, fun mnesia:delete_object/3),
- Route#route.binding
- end || Route <- Routes].
+ remove_routes(
+ lists:usort(mnesia:match_object(rabbit_route, Match, write) ++
+ mnesia:match_object(rabbit_durable_route, Match, write))).
-remove_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end).
+remove_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_routes/1).
-remove_transient_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end).
+remove_transient_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_transient_routes/1).
%%----------------------------------------------------------------------------
@@ -308,6 +302,14 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
+delete_object(Tab, Record, LockKind) ->
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:match_object(Tab, Record, LockKind) of
+ [] -> ok;
+ [_] -> mnesia:delete_object(Tab, Record, LockKind)
+ end.
+
sync_route(R, Fun) -> sync_route(R, true, true, Fun).
sync_route(Route, true, true, Fun) ->
@@ -370,16 +372,32 @@ lock_route_tables() ->
rabbit_semi_durable_route,
rabbit_durable_route]].
-remove_for_destination(DstName, DeleteFun) ->
+remove_routes(Routes) ->
+ %% This partitioning allows us to suppress unnecessary delete
+ %% operations on disk tables, which require an fsync.
+ {TransientRoutes, DurableRoutes} =
+ lists:partition(fun (R) -> mnesia:match_object(
+ rabbit_durable_route, R, write) == [] end,
+ Routes),
+ [ok = sync_transient_route(R, fun mnesia:delete_object/3) ||
+ R <- TransientRoutes],
+ [ok = sync_route(R, fun mnesia:delete_object/3) ||
+ R <- DurableRoutes],
+ [R#route.binding || R <- Routes].
+
+remove_transient_routes(Routes) ->
+ [begin
+ ok = sync_transient_route(R, fun delete_object/3),
+ R#route.binding
+ end || R <- Routes].
+
+remove_for_destination(DstName, Fun) ->
lock_route_tables(),
Match = reverse_route(
#route{binding = #binding{destination = DstName, _ = '_'}}),
- ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
- Bindings = [begin
- Route = reverse_route(ReverseRoute),
- ok = DeleteFun(Route),
- Route#route.binding
- end || ReverseRoute <- ReverseRoutes],
+ Routes = [reverse_route(R) || R <- mnesia:match_object(
+ rabbit_reverse_route, Match, write)],
+ Bindings = Fun(Routes),
group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
lists:keysort(#binding.source, Bindings)).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index a669a2b3..a3431321 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -31,7 +31,8 @@
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
--spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user()),
+-spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() |
+ {rabbit_types:username(), rabbit_types:password()}),
rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
{'ok', {rabbit_types:user(),
@@ -74,10 +75,17 @@ connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
{error, access_refused}
end;
+connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
+ connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid,
+ Infos);
+
connect(Username, VHost, Protocol, Pid, Infos) ->
+ connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos).
+
+connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
true ->
- case rabbit_access_control:check_user_login(Username, []) of
+ case rabbit_access_control:FunctionName(U, P) of
{ok, User} -> connect(User, VHost, Protocol, Pid, Infos);
{refused, _M, _A} -> {error, auth_failure}
end;
@@ -85,6 +93,7 @@ connect(Username, VHost, Protocol, Pid, Infos) ->
{error, broker_not_found_on_node}
end.
+
start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 57c571f1..4cc96ef5 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -402,7 +402,12 @@ conditional_delete(X = #exchange{name = XName}) ->
end.
unconditional_delete(X = #exchange{name = XName}) ->
- ok = mnesia:delete({rabbit_durable_exchange, XName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_exchange, XName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName})
+ end,
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index b40ceda9..08819427 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -31,7 +31,7 @@
-type(tx() :: 'transaction' | 'none').
-type(serial() :: pos_integer() | tx()).
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% Should Rabbit ensure that all binding events that are
%% delivered to an individual exchange can be serialised? (they
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 9a793aab..c5583ffd 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -21,7 +21,7 @@
-type(tx() :: 'transaction' | 'none').
-type(serial() :: pos_integer() | tx()).
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% Should Rabbit ensure that all binding events that are
%% delivered to an individual exchange can be serialised? (they
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index a95f8f26..26f74796 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -105,9 +105,9 @@ with_fhc_handle(Fun) ->
with_fhc_handle(1, Fun).
with_fhc_handle(N, Fun) ->
- [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)],
+ ok = file_handle_cache:obtain(N),
try Fun()
- after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)]
+ after ok = file_handle_cache:release(N)
end.
read_term_file(File) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index fb9f7e34..c11a8ff7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,10 +127,13 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ Info = gm:info(GM),
+ Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
+ node(Pid) =/= node()],
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
monitor_wait(MRefs),
+ ok = gm:forget_group(proplists:get_value(group_name, Info)),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 40600063..8ce19cc6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -123,37 +123,38 @@ init() ->
ensure_mnesia_dir(),
case is_virgin_node() of
true -> init_from_config();
- false -> normal_init(is_disc_node(), all_clustered_nodes())
+ false -> init(is_disc_node(), all_clustered_nodes())
end,
%% We intuitively expect the global name server to be synced when
- %% Mnesia is up. In fact that's not guaranteed to be the case - let's
- %% make it so.
+ %% Mnesia is up. In fact that's not guaranteed to be the case -
+ %% let's make it so.
ok = global:sync(),
ok.
-normal_init(DiscNode, AllNodes) ->
- init_db_and_upgrade(AllNodes, DiscNode, DiscNode).
+init(WantDiscNode, AllNodes) ->
+ init_db_and_upgrade(AllNodes, WantDiscNode, WantDiscNode).
init_from_config() ->
- {ok, {TryNodes, DiscNode}} =
+ {ok, {TryNodes, WantDiscNode}} =
application:get_env(rabbit, cluster_nodes),
case find_good_node(TryNodes -- [node()]) of
{ok, Node} ->
rabbit_log:info("Node '~p' selected for clustering from "
"configuration~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster(Node),
- init_db_and_upgrade(DiscNodes, DiscNode, false),
+ init_db_and_upgrade(DiscNodes, WantDiscNode, false),
rabbit_node_monitor:notify_joined_cluster();
none ->
rabbit_log:warning("Could not find any suitable node amongst the "
"ones provided in the configuration: ~p~n",
[TryNodes]),
- normal_init(true, [node()])
+ init(true, [node()])
end.
-%% Make the node join a cluster. The node will be reset automatically before we
-%% actually cluster it. The nodes provided will be used to find out about the
-%% nodes in the cluster.
+%% Make the node join a cluster. The node will be reset automatically
+%% before we actually cluster it. The nodes provided will be used to
+%% find out about the nodes in the cluster.
+%%
%% This function will fail if:
%%
%% * The node is currently the only disc node of its cluster
@@ -161,17 +162,12 @@ init_from_config() ->
%% * The node is currently already clustered with the cluster of the nodes
%% provided
%%
-%% Note that we make no attempt to verify that the nodes provided are all in the
-%% same cluster, we simply pick the first online node and we cluster to its
-%% cluster.
+%% Note that we make no attempt to verify that the nodes provided are
+%% all in the same cluster, we simply pick the first online node and
+%% we cluster to its cluster.
join_cluster(DiscoveryNode, WantDiscNode) ->
case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of
- true -> throw({error,
- {standalone_ram_node,
- "You can't cluster a node if it's the only "
- "disc node in its existing cluster. If new nodes "
- "joined while this node was offline, use "
- "\"update_cluster_nodes\" to add them manually"}});
+ true -> e(clustering_only_disc_node);
_ -> ok
end,
@@ -184,16 +180,14 @@ join_cluster(DiscoveryNode, WantDiscNode) ->
end,
case lists:member(node(), ClusterNodes) of
- true -> throw({error, {already_clustered,
- "You are already clustered with the nodes you "
- "have selected"}});
+ true -> e(already_clustered);
false -> ok
end,
- %% reset the node. this simplifies things and it will be needed in this case
- %% - we're joining a new cluster with new nodes which are not in synch with
- %% the current node. I also lifts the burden of reseting the node from the
- %% user.
+ %% reset the node. this simplifies things and it will be needed in
+ %% this case - we're joining a new cluster with new nodes which
+ %% are not in synch with the current node. I also lifts the burden
+ %% of reseting the node from the user.
reset(false),
rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]),
@@ -224,18 +218,14 @@ reset(Force) ->
false ->
AllNodes = all_clustered_nodes(),
%% Reconnecting so that we will get an up to date nodes.
- %% We don't need to check for consistency because we are resetting.
- %% Force=true here so that reset still works when clustered with a
- %% node which is down.
+ %% We don't need to check for consistency because we are
+ %% resetting. Force=true here so that reset still works
+ %% when clustered with a node which is down.
init_db_with_mnesia(AllNodes, is_disc_node(), false, true),
case is_disc_and_clustered() andalso
[node()] =:= clustered_disc_nodes()
of
- true -> throw({error, {standalone_ram_node,
- "You can't reset a node if it's the "
- "only disc node in a cluster. Please "
- "convert another node of the cluster "
- "to a disc node first."}});
+ true -> e(resetting_only_disc_node);
false -> ok
end,
leave_cluster(),
@@ -249,40 +239,26 @@ reset(Force) ->
ok = rabbit_node_monitor:reset_cluster_status(),
ok.
-%% We need to make sure that we don't end up in a distributed Erlang system with
-%% nodes while not being in an Mnesia cluster with them. We don't handle that
-%% well.
+%% We need to make sure that we don't end up in a distributed Erlang
+%% system with nodes while not being in an Mnesia cluster with
+%% them. We don't handle that well.
disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
change_cluster_node_type(Type) ->
ensure_mnesia_dir(),
ensure_mnesia_not_running(),
case is_clustered() of
- false -> throw({error, {not_clustered,
- "Non-clustered nodes can only be disc nodes"}});
+ false -> e(not_clustered);
true -> ok
end,
{_, _, RunningNodes} =
case discover_cluster(all_clustered_nodes()) of
- {ok, Status} ->
- Status;
- {error, _Reason} ->
- throw({error,
- {cannot_connect_to_cluster,
- "Could not connect to the cluster nodes present in "
- "this node status file. If the cluster has changed, "
- "you can use the \"update_cluster_nodes\" command to "
- "point to the new cluster nodes"}})
- end,
+ {ok, Status} -> Status;
+ {error, _Reason} -> e(cannot_connect_to_cluster)
+ end,
Node = case RunningNodes of
- [] ->
- throw({error,
- {no_online_cluster_nodes,
- "Could not find any online cluster nodes. If the "
- "cluster has changed, you can use the 'recluster' "
- "command."}});
- [Node0|_] ->
- Node0
+ [] -> e(no_online_cluster_nodes);
+ [Node0|_] -> Node0
end,
ok = reset(false),
ok = join_cluster(Node, case Type of
@@ -296,81 +272,63 @@ update_cluster_nodes(DiscoveryNode) ->
Status = {AllNodes, _, _} =
case discover_cluster(DiscoveryNode) of
- {ok, Status0} ->
- Status0;
- {error, _Reason} ->
- throw({error,
- {cannot_connect_to_node,
- "Could not connect to the cluster node provided"}})
+ {ok, Status0} -> Status0;
+ {error, _Reason} -> e(cannot_connect_to_node)
end,
case ordsets:is_element(node(), AllNodes) of
- true -> %% As in `check_consistency/0', we can safely delete the schema
- %% here, since it'll be replicated from the other nodes
- mnesia:delete_schema([node()]),
- rabbit_node_monitor:write_cluster_status(Status),
- init_db_with_mnesia(AllNodes, is_disc_node(), false);
- false -> throw({error,
- {inconsistent_cluster,
- "The nodes provided do not have this node as part of "
- "the cluster"}})
+ true ->
+ %% As in `check_consistency/0', we can safely delete the
+ %% schema here, since it'll be replicated from the other
+ %% nodes
+ mnesia:delete_schema([node()]),
+ rabbit_node_monitor:write_cluster_status(Status),
+ init_db_with_mnesia(AllNodes, is_disc_node(), false);
+ false ->
+ e(inconsistent_cluster)
end,
-
ok.
-%% We proceed like this: try to remove the node locally. If the node is offline,
-%% we remove the node if:
+%% We proceed like this: try to remove the node locally. If the node
+%% is offline, we remove the node if:
%% * This node is a disc node
%% * All other nodes are offline
-%% * This node was, at the best of our knowledge (see comment below) the last
-%% or second to last after the node we're removing to go down
+%% * This node was, at the best of our knowledge (see comment below)
+%% the last or second to last after the node we're removing to go
+%% down
forget_cluster_node(Node, RemoveWhenOffline) ->
case ordsets:is_element(Node, all_clustered_nodes()) of
true -> ok;
- false -> throw({error, {not_a_cluster_node,
- "The node selected is not in the cluster."}})
+ false -> e(not_a_cluster_node)
end,
case {mnesia:system_info(is_running), RemoveWhenOffline} of
- {yes, true} -> throw({error, {online_node_offline_flag,
- "You set the --offline flag, which is "
- "used to remove nodes remotely from "
- "offline nodes, but this node is "
- "online. "}});
+ {yes, true} -> e(online_node_offline_flag);
_ -> ok
end,
case remove_node_if_mnesia_running(Node) of
ok ->
ok;
+ {error, mnesia_not_running} when RemoveWhenOffline ->
+ remove_node_offline_node(Node);
{error, mnesia_not_running} ->
- case RemoveWhenOffline of
- true -> remove_node_offline_node(Node);
- false -> throw({error,
- {offline_node_no_offline_flag,
- "You are trying to remove a node from an "
- "offline node. That's dangerous, but can be "
- "done with the --offline flag. Please consult "
- "the manual for rabbitmqctl for more "
- "information."}})
- end;
+ e(offline_node_no_offline_flag);
Err = {error, _} ->
throw(Err)
end.
remove_node_offline_node(Node) ->
- case {ordsets:del_element(Node,
- running_nodes(all_clustered_nodes())),
- is_disc_node()}
- of
+ case {ordsets:del_element(Node, running_nodes(all_clustered_nodes())),
+ is_disc_node()} of
{[], true} ->
- %% Note that while we check if the nodes was the last to go down,
- %% apart from the node we're removing from, this is still unsafe.
- %% Consider the situation in which A and B are clustered. A goes
- %% down, and records B as the running node. Then B gets clustered
- %% with C, C goes down and B goes down. In this case, C is the
- %% second-to-last, but we don't know that and we'll remove B from A
+ %% Note that while we check if the nodes was the last to
+ %% go down, apart from the node we're removing from, this
+ %% is still unsafe. Consider the situation in which A and
+ %% B are clustered. A goes down, and records B as the
+ %% running node. Then B gets clustered with C, C goes down
+ %% and B goes down. In this case, C is the second-to-last,
+ %% but we don't know that and we'll remove B from A
%% anyway, even if that will lead to bad things.
case ordsets:subtract(running_clustered_nodes(),
- ordsets:from_list([node(), Node]))
- of
+ ordsets:from_list([node(), Node])) of
[] -> start_mnesia(),
try
[mnesia:force_load_table(T) ||
@@ -380,20 +338,10 @@ remove_node_offline_node(Node) ->
after
stop_mnesia()
end;
- _ -> throw({error,
- {not_last_node_to_go_down,
- "The node you're trying to remove from was not "
- "the last to go down (excluding the node you are "
- "removing). Please use the the last node to go "
- "down to remove nodes when the cluster is "
- "offline."}})
+ _ -> e(not_last_node_to_go_down)
end;
{_, _} ->
- throw({error,
- {removing_node_from_offline_node,
- "To remove a node remotely from an offline node, the node "
- "you're removing from must be a disc node and all the "
- "other nodes must be offline."}})
+ e(removing_node_from_offline_node)
end.
@@ -420,63 +368,62 @@ is_clustered() ->
Nodes = all_clustered_nodes(),
[node()] =/= Nodes andalso [] =/= Nodes.
-is_disc_and_clustered() ->
- is_disc_node() andalso is_clustered().
+is_disc_and_clustered() -> is_disc_node() andalso is_clustered().
-%% Functions that retrieve the nodes in the cluster will rely on the status file
-%% if offline.
+%% Functions that retrieve the nodes in the cluster will rely on the
+%% status file if offline.
-all_clustered_nodes() ->
- cluster_status(all).
+all_clustered_nodes() -> cluster_status(all).
-clustered_disc_nodes() ->
- cluster_status(disc).
+clustered_disc_nodes() -> cluster_status(disc).
-clustered_ram_nodes() ->
- ordsets:subtract(cluster_status(all), cluster_status(disc)).
+clustered_ram_nodes() -> ordsets:subtract(cluster_status(all),
+ cluster_status(disc)).
-running_clustered_nodes() ->
- cluster_status(running).
+running_clustered_nodes() -> cluster_status(running).
running_clustered_disc_nodes() ->
{_, DiscNodes, RunningNodes} = cluster_status(),
ordsets:intersection(DiscNodes, RunningNodes).
-%% This function is the actual source of information, since it gets the data
-%% from mnesia. Obviously it'll work only when mnesia is running.
+%% This function is the actual source of information, since it gets
+%% the data from mnesia. Obviously it'll work only when mnesia is
+%% running.
mnesia_nodes() ->
case mnesia:system_info(is_running) of
- no -> {error, mnesia_not_running};
- yes -> %% If the tables are not present, it means that `init_db/3'
- %% hasn't been run yet. In other words, either we are a virgin
- %% node or a restarted RAM node. In both cases we're not
- %% interested in what mnesia has to say.
- IsDiscNode = mnesia:system_info(use_dir),
- Tables = mnesia:system_info(tables),
- {Table, _} = case table_definitions(case IsDiscNode of
- true -> disc;
- false -> ram
- end) of [T|_] -> T end,
- case lists:member(Table, Tables) of
- true ->
- AllNodes =
- ordsets:from_list(mnesia:system_info(db_nodes)),
- DiscCopies = ordsets:from_list(
- mnesia:table_info(schema, disc_copies)),
- DiscNodes =
- case IsDiscNode of
- true -> ordsets:add_element(node(), DiscCopies);
- false -> DiscCopies
- end,
- {ok, {AllNodes, DiscNodes}};
- false ->
- {error, tables_not_present}
- end
+ no ->
+ {error, mnesia_not_running};
+ yes ->
+ %% If the tables are not present, it means that
+ %% `init_db/3' hasn't been run yet. In other words, either
+ %% we are a virgin node or a restarted RAM node. In both
+ %% cases we're not interested in what mnesia has to say.
+ IsDiscNode = mnesia:system_info(use_dir),
+ Tables = mnesia:system_info(tables),
+ {Table, _} = case table_definitions(case IsDiscNode of
+ true -> disc;
+ false -> ram
+ end) of [T|_] -> T end,
+ case lists:member(Table, Tables) of
+ true ->
+ AllNodes =
+ ordsets:from_list(mnesia:system_info(db_nodes)),
+ DiscCopies = ordsets:from_list(
+ mnesia:table_info(schema, disc_copies)),
+ DiscNodes =
+ case IsDiscNode of
+ true -> ordsets:add_element(node(), DiscCopies);
+ false -> DiscCopies
+ end,
+ {ok, {AllNodes, DiscNodes}};
+ false ->
+ {error, tables_not_present}
+ end
end.
cluster_status(WhichNodes, ForceMnesia) ->
- %% I don't want to call `running_nodes/1' unless if necessary, since it can
- %% deadlock when stopping applications.
+ %% I don't want to call `running_nodes/1' unless if necessary, since it's
+ %% pretty expensive.
Nodes = case mnesia_nodes() of
{ok, {AllNodes, DiscNodes}} ->
{ok, {AllNodes, DiscNodes,
@@ -509,11 +456,9 @@ cluster_status(WhichNodes) ->
{ok, Status} = cluster_status(WhichNodes, false),
Status.
-cluster_status() ->
- cluster_status(status).
+cluster_status() -> cluster_status(status).
-cluster_status_from_mnesia() ->
- cluster_status(status, true).
+cluster_status_from_mnesia() -> cluster_status(status, true).
node_info() ->
{erlang:system_info(otp_release), rabbit_misc:version(),
@@ -525,21 +470,22 @@ is_disc_node() ->
dir() -> mnesia:system_info(directory).
-table_names() ->
- [Tab || {Tab, _} <- table_definitions()].
+table_names() -> [Tab || {Tab, _} <- table_definitions()].
%%----------------------------------------------------------------------------
%% Operations on the db
%%----------------------------------------------------------------------------
-%% Adds the provided nodes to the mnesia cluster, creating a new schema if there
-%% is the need to and catching up if there are other nodes in the cluster
-%% already. It also updates the cluster status file.
+%% Adds the provided nodes to the mnesia cluster, creating a new
+%% schema if there is the need to and catching up if there are other
+%% nodes in the cluster already. It also updates the cluster status
+%% file.
init_db(ClusterNodes, WantDiscNode, Force) ->
Nodes = change_extra_db_nodes(ClusterNodes, Force),
- %% Note that we use `system_info' here and not the cluster status since when
- %% we start rabbit for the first time the cluster status will say we are a
- %% disc node but the tables won't be present yet.
+ %% Note that we use `system_info' here and not the cluster status
+ %% since when we start rabbit for the first time the cluster
+ %% status will say we are a disc node but the tables won't be
+ %% present yet.
WasDiscNode = mnesia:system_info(use_dir),
case {Nodes, WasDiscNode, WantDiscNode} of
{[], _, false} ->
@@ -556,11 +502,11 @@ init_db(ClusterNodes, WantDiscNode, Force) ->
ensure_version_ok(
rpc:call(AnotherNode, rabbit_version, recorded, [])),
ok = wait_for_replicated_tables(),
- %% The sequence in which we delete the schema and then the other
- %% tables is important: if we delete the schema first when moving to
- %% RAM mnesia will loudly complain since it doesn't make much sense
- %% to do that. But when moving to disc, we need to move the schema
- %% first.
+ %% The sequence in which we delete the schema and then the
+ %% other tables is important: if we delete the schema
+ %% first when moving to RAM mnesia will loudly complain
+ %% since it doesn't make much sense to do that. But when
+ %% moving to disc, we need to move the schema first.
case WantDiscNode of
true -> create_local_table_copy(schema, disc_copies),
create_local_table_copies(disc);
@@ -579,8 +525,8 @@ init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
starting_from_scratch -> rabbit_version:record_desired();
version_not_available -> schema_ok_or_move()
end,
- %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget about the
- %% cluster
+ %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget
+ %% about the cluster
case WantDiscNode of
false -> start_mnesia(),
change_extra_db_nodes(ClusterNodes, true),
@@ -696,8 +642,8 @@ wait_for_tables(TableNames) ->
throw({error, {failed_waiting_for_tables, Reason}})
end.
-%% This does not guarantee us much, but it avoids some situations that will
-%% definitely end up badly
+%% This does not guarantee us much, but it avoids some situations that
+%% will definitely end up badly
check_cluster_consistency() ->
%% We want to find 0 or 1 consistent nodes.
case lists:foldl(
@@ -708,18 +654,21 @@ check_cluster_consistency() ->
of
{ok, Status = {RemoteAllNodes, _, _}} ->
case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of
- true -> ok;
- false -> %% We delete the schema here since we think we are
- %% clustered with nodes that are no longer in the
- %% cluster and there is no other way to remove them
- %% from our schema. On the other hand, we are sure
- %% that there is another online node that we can use
- %% to sync the tables with. There is a race here: if
- %% between this check and the `init_db' invocation the
- %% cluster gets disbanded, we're left with a node with
- %% no mnesia data that will try to connect to offline
- %% nodes.
- mnesia:delete_schema([node()])
+ true ->
+ ok;
+ false ->
+ %% We delete the schema here since we think we are
+ %% clustered with nodes that are no longer in the
+ %% cluster and there is no other way to remove
+ %% them from our schema. On the other hand, we are
+ %% sure that there is another online node that we
+ %% can use to sync the tables with. There is a
+ %% race here: if between this check and the
+ %% `init_db' invocation the cluster gets
+ %% disbanded, we're left with a node with no
+ %% mnesia data that will try to connect to offline
+ %% nodes.
+ mnesia:delete_schema([node()])
end,
rabbit_node_monitor:write_cluster_status(Status);
{error, not_found} ->
@@ -764,9 +713,7 @@ on_node_down(_Node) ->
discover_cluster(Nodes) when is_list(Nodes) ->
lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
(Node, {error, _}) -> discover_cluster(Node)
- end,
- {error, no_nodes_provided},
- Nodes);
+ end, {error, no_nodes_provided}, Nodes);
discover_cluster(Node) ->
OfflineError =
{error, {cannot_discover_cluster,
@@ -776,7 +723,8 @@ discover_cluster(Node) ->
{error, {cannot_discover_cluster,
"You provided the current node as node to cluster with"}};
false ->
- case rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []) of
+ case rpc:call(Node,
+ rabbit_mnesia, cluster_status_from_mnesia, []) of
{badrpc, _Reason} -> OfflineError;
{error, mnesia_not_running} -> OfflineError;
{ok, Res} -> {ok, Res}
@@ -966,7 +914,8 @@ ensure_version_ok({ok, DiscVersion}) ->
ensure_version_ok({error, _}) ->
ok = rabbit_version:record_desired().
-%% We only care about disc nodes since ram nodes are supposed to catch up only
+%% We only care about disc nodes since ram nodes are supposed to catch
+%% up only
create_schema() ->
stop_mnesia(),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
@@ -1039,50 +988,39 @@ create_local_table_copy(Tab, Type) ->
remove_node_if_mnesia_running(Node) ->
case mnesia:system_info(is_running) of
- yes -> %% Deleting the the schema copy of the node will result in the
- %% node being removed from the cluster, with that change being
- %% propagated to all nodes
- case mnesia:del_table_copy(schema, Node) of
- {atomic, ok} ->
- rabbit_node_monitor:notify_left_cluster(Node),
- ok;
- {aborted, Reason} ->
- {error, {failed_to_remove_node, Node, Reason}}
- end;
- no -> {error, mnesia_not_running}
+ yes ->
+ %% Deleting the the schema copy of the node will result in
+ %% the node being removed from the cluster, with that
+ %% change being propagated to all nodes
+ case mnesia:del_table_copy(schema, Node) of
+ {atomic, ok} ->
+ rabbit_node_monitor:notify_left_cluster(Node),
+ ok;
+ {aborted, Reason} ->
+ {error, {failed_to_remove_node, Node, Reason}}
+ end;
+ no ->
+ {error, mnesia_not_running}
end.
leave_cluster() ->
case {is_clustered(),
running_nodes(ordsets:del_element(node(), all_clustered_nodes()))}
of
- {false, []} ->
- ok;
- {_, AllNodes} ->
- case lists:any(
- fun (Node) ->
- case rpc:call(Node, rabbit_mnesia,
- remove_node_if_mnesia_running,
- [node()])
- of
- ok ->
- true;
- {error, mnesia_not_running} ->
- false;
- {error, Reason} ->
- throw({error, Reason});
- {badrpc, nodedown} ->
- false
- end
- end,
- AllNodes)
- of
- true -> ok;
- false -> throw({error,
- {no_running_cluster_nodes,
- "You cannot leave a cluster if no online "
- "nodes are present"}})
- end
+ {false, []} -> ok;
+ {_, AllNodes} -> case lists:any(fun leave_cluster/1, AllNodes) of
+ true -> ok;
+ false -> e(no_running_cluster_nodes)
+ end
+ end.
+
+leave_cluster(Node) ->
+ case rpc:call(Node,
+ rabbit_mnesia, remove_node_if_mnesia_running, [node()]) of
+ ok -> true;
+ {error, mnesia_not_running} -> false;
+ {error, Reason} -> throw({error, Reason});
+ {badrpc, nodedown} -> false
end.
wait_for(Condition) ->
@@ -1114,18 +1052,15 @@ change_extra_db_nodes(ClusterNodes0, Force) ->
Nodes
end.
-%% What we really want is nodes running rabbit, not running mnesia. Using
-%% `rabbit_mnesia:system_info(running_db_nodes)' will return false positives
-%% when we are actually just doing cluster operations (e.g. joining the
-%% cluster).
+%% We're not using `mnesia:system_info(running_db_nodes)' directly because if
+%% the node is a RAM node it won't know about other nodes when mnesia is stopped
running_nodes(Nodes) ->
{Replies, _BadNodes} =
rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []),
[Node || {Running, Node} <- Replies, Running].
is_running_remote() ->
- {proplists:is_defined(rabbit, application:which_applications(infinity)),
- node()}.
+ {mnesia:system_info(is_running) =:= yes, node()}.
check_consistency(OTP, Rabbit) ->
rabbit_misc:sequence_error(
@@ -1162,31 +1097,74 @@ check_otp_consistency(Remote) ->
check_rabbit_consistency(Remote) ->
check_version_consistency(rabbit_misc:version(), Remote, "Rabbit").
-%% This is fairly tricky. We want to know if the node is in the state that a
-%% `reset' would leave it in. We cannot simply check if the mnesia tables
-%% aren't there because restarted RAM nodes won't have tables while still being
-%% non-virgin. What we do instead is to check if the mnesia directory is non
-%% existant or empty, with the exception of the cluster status file, which will
-%% be there thanks to `rabbit_node_monitor:prepare_cluster_status_file/0'.
+%% This is fairly tricky. We want to know if the node is in the state
+%% that a `reset' would leave it in. We cannot simply check if the
+%% mnesia tables aren't there because restarted RAM nodes won't have
+%% tables while still being non-virgin. What we do instead is to
+%% check if the mnesia directory is non existant or empty, with the
+%% exception of the cluster status files, which will be there thanks to
+%% `rabbit_node_monitor:prepare_cluster_status_file/0'.
is_virgin_node() ->
case rabbit_file:list_dir(dir()) of
{error, enoent} -> true;
- {ok, []} -> true;
- {ok, [File]} -> (dir() ++ "/" ++ File) =:=
- [rabbit_node_monitor:cluster_status_filename(),
- rabbit_node_monitor:running_nodes_filename()];
- {ok, _} -> false
+ {ok, []} -> true;
+ {ok, [File1, File2]} ->
+ lists:usort([dir() ++ "/" ++ File1, dir() ++ "/" ++ File2]) =:=
+ lists:usort([rabbit_node_monitor:cluster_status_filename(),
+ rabbit_node_monitor:running_nodes_filename()]);
+ {ok, _} -> false
end.
find_good_node([]) ->
none;
find_good_node([Node | Nodes]) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _Reason} ->
- find_good_node(Nodes);
- {OTP, Rabbit, _} ->
- case check_consistency(OTP, Rabbit) of
- {error, _} -> find_good_node(Nodes);
- ok -> {ok, Node}
- end
+ {badrpc, _Reason} -> find_good_node(Nodes);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> find_good_node(Nodes);
+ ok -> {ok, Node}
+ end
end.
+
+e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
+
+error_description(clustering_only_disc_node) ->
+ "You cannot cluster a node if it is the only disc node in its existing "
+ " cluster. If new nodes joined while this node was offline, use "
+ "\"update_cluster_nodes\" to add them manually.";
+error_description(resetting_only_disc_node) ->
+ "You cannot reset a node when it is the only disc node in a cluster. "
+ "Please convert another node of the cluster to a disc node first.";
+error_description(already_clustered) ->
+ "You are already clustered with the nodes you have selected.";
+error_description(not_clustered) ->
+ "Non-clustered nodes can only be disc nodes.";
+error_description(cannot_connect_to_cluster) ->
+ "Could not connect to the cluster nodes present in this node's "
+ "status file. If the cluster has changed, you can use the "
+ "\"update_cluster_nodes\" command to point to the new cluster nodes.";
+error_description(no_online_cluster_nodes) ->
+ "Could not find any online cluster nodes. If the cluster has changed, "
+ "you can use the 'recluster' command.";
+error_description(cannot_connect_to_node) ->
+ "Could not connect to the cluster node provided.";
+error_description(inconsistent_cluster) ->
+ "The nodes provided do not have this node as part of the cluster.";
+error_description(not_a_cluster_node) ->
+ "The node selected is not in the cluster.";
+error_description(online_node_offline_flag) ->
+ "You set the --offline flag, which is used to remove nodes remotely from "
+ "offline nodes, but this node is online.";
+error_description(offline_node_no_offline_flag) ->
+ "You are trying to remove a node from an offline node. That is dangerous, "
+ "but can be done with the --offline flag. Please consult the manual "
+ "for rabbitmqctl for more information.";
+error_description(not_last_node_to_go_down) ->
+ "The node you're trying to remove from was not the last to go down "
+ "(excluding the node you are removing). Please use the the last node "
+ "to go down to remove nodes when the cluster is offline.";
+error_description(removing_node_from_offline_node) ->
+ "To remove a node remotely from an offline node, the node you're removing "
+ "from must be a disc node and all the other nodes must be offline.";
+error_description(no_running_cluster_nodes) ->
+ "You cannot leave a cluster if no online nodes are present.".