summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-21 13:13:07 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-21 13:13:07 +0100
commit7f9123f19674dc29a586b9f50c365818ed4f89cc (patch)
tree4e4811c1e8e4735251efee86beb0d1acb659701b
parent325ceeb402772b8484d4262bdea002dd9f9fc1a0 (diff)
parent2df7dad44e6290ce25361dc18e9087e041f7bccc (diff)
downloadrabbitmq-server-7f9123f19674dc29a586b9f50c365818ed4f89cc.tar.gz
merge default
-rw-r--r--Makefile2
-rw-r--r--src/file_handle_cache.erl59
-rw-r--r--src/gm.erl33
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_auth_backend.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_exchange_decorator.erl2
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_file.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_mnesia.erl33
11 files changed, 86 insertions, 65 deletions
diff --git a/Makefile b/Makefile
index f3729cfa..c63e3dfd 100644
--- a/Makefile
+++ b/Makefile
@@ -147,7 +147,7 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
dialyzer --plt $(BASIC_PLT) --no_native --fullpath \
- -Wrace_conditions $(BEAM_TARGETS)
+ $(BEAM_TARGETS)
# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target
create-plt: $(RABBIT_PLT)
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 68c095d2..3260d369 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -120,12 +120,12 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain, release and transfer. obtain/0
+%% The server also supports obtain, release and transfer. obtain/{0,1}
%% blocks until a file descriptor is available, at which point the
-%% requesting process is considered to 'own' one more
-%% descriptor. release/0 is the inverse operation and releases a
-%% previously obtained descriptor. transfer/1 transfers ownership of a
-%% file descriptor between processes. It is non-blocking. Obtain has a
+%% requesting process is considered to 'own' more descriptor(s).
+%% release/{0,1} is the inverse operation and releases previously obtained
+%% descriptor(s). transfer/{1,2} transfers ownership of file descriptor(s)
+%% between processes. It is non-blocking. Obtain has a
%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
%% the entire limit, but will be evicted by obtain calls up to the
%% point at which no more obtain calls can be satisfied by the obtains
@@ -136,8 +136,8 @@
%% as sockets can do so in such a way that the overall number of open
%% file descriptors is managed.
%%
-%% The callers of register_callback/3, obtain/0, and the argument of
-%% transfer/1 are monitored, reducing the count of handles in use
+%% The callers of register_callback/3, obtain, and the argument of
+%% transfer are monitored, reducing the count of handles in use
%% appropriately when the processes terminate.
-behaviour(gen_server2).
@@ -146,7 +146,8 @@
-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
+-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
+ set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
@@ -251,8 +252,11 @@
-spec(clear/1 :: (ref()) -> ok_or_error()).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
+-spec(obtain/1 :: (non_neg_integer()) -> 'ok').
-spec(release/0 :: () -> 'ok').
+-spec(release/1 :: (non_neg_integer()) -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
+-spec(transfer/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -485,18 +489,22 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-obtain() ->
+obtain() -> obtain(1).
+release() -> release(1).
+transfer(Pid) -> transfer(Pid, 1).
+
+obtain(Count) when Count > 0 ->
%% If the FHC isn't running, obtains succeed immediately.
case whereis(?SERVER) of
undefined -> ok;
- _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity)
+ _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity)
end.
-release() ->
- gen_server2:cast(?SERVER, {release, self()}).
+release(Count) when Count > 0 ->
+ gen_server2:cast(?SERVER, {release, Count, self()}).
-transfer(Pid) ->
- gen_server2:cast(?SERVER, {transfer, self(), Pid}).
+transfer(Pid, Count) when Count > 0 ->
+ gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}).
set_limit(Limit) ->
gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
@@ -842,7 +850,7 @@ init([AlarmSet, AlarmClear]) ->
prioritise_cast(Msg, _State) ->
case Msg of
- {release, _} -> 5;
+ {release, _, _} -> 5;
_ -> 0
end.
@@ -875,11 +883,12 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State)}
end;
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients }) ->
+handle_call({obtain, N, Pid}, From, State = #fhc_state {
+ obtain_count = Count,
+ obtain_pending = Pending,
+ clients = Clients }) ->
ok = track_client(Pid, Clients),
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ Item = #pending { kind = obtain, pid = Pid, requested = N, from = From },
Enqueue = fun () ->
true = ets:update_element(Clients, Pid,
{#cstate.blocked, true}),
@@ -890,7 +899,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
case obtain_limit_reached(State) of
true -> Enqueue();
false -> case needs_reduce(State #fhc_state {
- obtain_count = Count + 1 }) of
+ obtain_count = Count + N }) of
true -> reduce(Enqueue());
false -> adjust_alarm(
State, run_pending_item(Item, State))
@@ -925,9 +934,9 @@ handle_cast({update, Pid, EldestUnusedSince},
%% storm of messages
{noreply, State};
-handle_cast({release, Pid}, State) ->
+handle_cast({release, N, Pid}, State) ->
{noreply, adjust_alarm(State, process_pending(
- update_counts(obtain, Pid, -1, State)))};
+ update_counts(obtain, Pid, -N, State)))};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
@@ -939,11 +948,11 @@ handle_cast({close, Pid, EldestUnusedSince},
{noreply, adjust_alarm(State, process_pending(
update_counts(open, Pid, -1, State)))};
-handle_cast({transfer, FromPid, ToPid}, State) ->
+handle_cast({transfer, N, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
{noreply, process_pending(
- update_counts(obtain, ToPid, +1,
- update_counts(obtain, FromPid, -1, State)))}.
+ update_counts(obtain, ToPid, +N,
+ update_counts(obtain, FromPid, -N, State)))}.
handle_info(check_counts, State) ->
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
diff --git a/src/gm.erl b/src/gm.erl
index f88ed18f..90433e84 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -77,9 +77,13 @@
%% confirmed_broadcast/2 directly from the callback module otherwise
%% you will deadlock the entire group.
%%
-%% group_members/1
-%% Provide the Pid. Returns a list of the current group members.
+%% info/1
+%% Provide the Pid. Returns a proplist with various facts, including
+%% the group name and the current group members.
%%
+%% forget_group/1
+%% Provide the group name. Removes its mnesia record. Makes no attempt
+%% to ensure the group is empty.
%%
%% Implementation Overview
%% -----------------------
@@ -373,7 +377,7 @@
-behaviour(gen_server2).
-export([create_tables/0, start_link/3, leave/1, broadcast/2,
- confirmed_broadcast/2, group_members/1]).
+ confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/2]).
@@ -431,7 +435,8 @@
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
--spec(group_members/1 :: (pid()) -> [pid()]).
+-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(forget_group/1 :: (group_name()) -> 'ok').
%% The joined, members_changed and handle_msg callbacks can all return
%% any of the following terms:
@@ -514,9 +519,15 @@ broadcast(Server, Msg) ->
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
-group_members(Server) ->
- gen_server2:call(Server, group_members, infinity).
+info(Server) ->
+ gen_server2:call(Server, info, infinity).
+forget_group(GroupName) ->
+ {atomic, ok} = mnesia:sync_transaction(
+ fun () ->
+ mnesia:delete({?GROUP_TABLE, GroupName})
+ end),
+ ok.
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
@@ -553,12 +564,16 @@ handle_call({confirmed_broadcast, Msg}, _From,
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
-handle_call(group_members, _From,
+handle_call(info, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
-handle_call(group_members, _From, State = #state { view = View }) ->
- reply(get_pids(alive_view_members(View)), State);
+handle_call(info, _From, State = #state { group_name = GroupName,
+ module = Module,
+ view = View }) ->
+ reply([{group_name, GroupName},
+ {module, Module},
+ {group_members, get_pids(alive_view_members(View))}], State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 20ba4574..e647627c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -560,18 +560,15 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- msg_seq_no = MsgSeqNo,
sender = SenderPid}, State) ->
Confirm = should_confirm_message(Delivery, State),
case attempt_delivery(Delivery, Confirm, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
- %% the next two are optimisations
+ %% the next one is an optimisations
+ %% TODO: optimise the Confirm =/= never case too
{false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
discard_delivery(Delivery, State1);
- {false, State1 = #q{ttl = 0, dlx = undefined}} ->
- rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- discard_delivery(Delivery, State1);
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index e89951e7..c9475efd 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -20,7 +20,7 @@
%% A description proplist as with auth mechanisms,
%% exchanges. Currently unused.
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% Check a user can log in, given a username and a proplist of
%% authentication information (e.g. [{password, Password}]).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index eda6a743..c7d74dc3 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -19,7 +19,7 @@
-ifdef(use_specs).
%% A description.
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% If this mechanism is enabled, should it be offered for a given socket?
%% (primarily so EXTERNAL can be SSL-only)
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index b40ceda9..08819427 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -31,7 +31,7 @@
-type(tx() :: 'transaction' | 'none').
-type(serial() :: pos_integer() | tx()).
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% Should Rabbit ensure that all binding events that are
%% delivered to an individual exchange can be serialised? (they
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 9a793aab..c5583ffd 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -21,7 +21,7 @@
-type(tx() :: 'transaction' | 'none').
-type(serial() :: pos_integer() | tx()).
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% Should Rabbit ensure that all binding events that are
%% delivered to an individual exchange can be serialised? (they
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index a95f8f26..26f74796 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -105,9 +105,9 @@ with_fhc_handle(Fun) ->
with_fhc_handle(1, Fun).
with_fhc_handle(N, Fun) ->
- [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)],
+ ok = file_handle_cache:obtain(N),
try Fun()
- after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)]
+ after ok = file_handle_cache:release(N)
end.
read_term_file(File) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index fb9f7e34..c11a8ff7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,10 +127,13 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ Info = gm:info(GM),
+ Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
+ node(Pid) =/= node()],
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
monitor_wait(MRefs),
+ ok = gm:forget_group(proplists:get_value(group_name, Info)),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a926a9c4..991f8c72 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -402,8 +402,8 @@ mnesia_nodes() ->
end.
cluster_status(WhichNodes) ->
- %% I don't want to call `running_nodes/1' unless if necessary,
- %% since it can deadlock when stopping applications.
+ %% I don't want to call `running_nodes/1' unless if necessary, since it's
+ %% pretty expensive.
{AllNodes1, DiscNodes1, RunningNodesThunk} =
case mnesia_nodes() of
{ok, {AllNodes, DiscNodes}} ->
@@ -411,10 +411,9 @@ cluster_status(WhichNodes) ->
{error, _Reason} ->
{AllNodes, DiscNodes, RunningNodes} =
rabbit_node_monitor:read_cluster_status(),
- %% The cluster status file records the status when the
- %% node is online, but we know for sure that the node
- %% is offline now, so we can remove it from the list
- %% of running nodes.
+ %% The cluster status file records the status when the node is
+ %% online, but we know for sure that the node is offline now, so
+ %% we can remove it from the list of running nodes.
{AllNodes, DiscNodes, fun() -> nodes_excl_me(RunningNodes) end}
end,
case WhichNodes of
@@ -1028,18 +1027,15 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-%% What we really want is nodes running rabbit, not running
-%% mnesia. Using `mnesia:system_info(running_db_nodes)' will
-%% return false positives when we are actually just doing cluster
-%% operations (e.g. joining the cluster).
+%% We're not using `mnesia:system_info(running_db_nodes)' directly because if
+%% the node is a RAM node it won't know about other nodes when mnesia is stopped
running_nodes(Nodes) ->
{Replies, _BadNodes} =
rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []),
[Node || {Running, Node} <- Replies, Running].
is_running_remote() ->
- {proplists:is_defined(rabbit, application:which_applications(infinity)),
- node()}.
+ {mnesia:system_info(is_running) =:= yes, node()}.
check_consistency(OTP, Rabbit) ->
rabbit_misc:sequence_error(
@@ -1080,16 +1076,17 @@ check_rabbit_consistency(Remote) ->
%% mnesia tables aren't there because restarted RAM nodes won't have
%% tables while still being non-virgin. What we do instead is to
%% check if the mnesia directory is non existant or empty, with the
-%% exception of the cluster status file, which will be there thanks to
+%% exception of the cluster status files, which will be there thanks to
%% `rabbit_node_monitor:prepare_cluster_status_file/0'.
is_virgin_node() ->
case rabbit_file:list_dir(dir()) of
{error, enoent} -> true;
- {ok, []} -> true;
- {ok, [File]} -> (dir() ++ "/" ++ File) =:=
- [rabbit_node_monitor:cluster_status_filename(),
- rabbit_node_monitor:running_nodes_filename()];
- {ok, _} -> false
+ {ok, []} -> true;
+ {ok, [File1, File2]} ->
+ lists:usort([dir() ++ "/" ++ File1, dir() ++ "/" ++ File2]) =:=
+ lists:usort([rabbit_node_monitor:cluster_status_filename(),
+ rabbit_node_monitor:running_nodes_filename()]);
+ {ok, _} -> false
end.
find_good_node([]) ->