diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-21 13:13:07 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-21 13:13:07 +0100 |
commit | 7f9123f19674dc29a586b9f50c365818ed4f89cc (patch) | |
tree | 4e4811c1e8e4735251efee86beb0d1acb659701b | |
parent | 325ceeb402772b8484d4262bdea002dd9f9fc1a0 (diff) | |
parent | 2df7dad44e6290ce25361dc18e9087e041f7bccc (diff) | |
download | rabbitmq-server-7f9123f19674dc29a586b9f50c365818ed4f89cc.tar.gz |
merge default
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 59 | ||||
-rw-r--r-- | src/gm.erl | 33 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | src/rabbit_auth_backend.erl | 2 | ||||
-rw-r--r-- | src/rabbit_auth_mechanism.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_decorator.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 2 | ||||
-rw-r--r-- | src/rabbit_file.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 33 |
11 files changed, 86 insertions, 65 deletions
@@ -147,7 +147,7 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c dialyze: $(BEAM_TARGETS) $(BASIC_PLT) dialyzer --plt $(BASIC_PLT) --no_native --fullpath \ - -Wrace_conditions $(BEAM_TARGETS) + $(BEAM_TARGETS) # rabbit.plt is used by rabbitmq-erlang-client's dialyze make target create-plt: $(RABBIT_PLT) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 68c095d2..3260d369 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -120,12 +120,12 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain, release and transfer. obtain/0 +%% The server also supports obtain, release and transfer. obtain/{0,1} %% blocks until a file descriptor is available, at which point the -%% requesting process is considered to 'own' one more -%% descriptor. release/0 is the inverse operation and releases a -%% previously obtained descriptor. transfer/1 transfers ownership of a -%% file descriptor between processes. It is non-blocking. Obtain has a +%% requesting process is considered to 'own' more descriptor(s). +%% release/{0,1} is the inverse operation and releases previously obtained +%% descriptor(s). transfer/{1,2} transfers ownership of file descriptor(s) +%% between processes. It is non-blocking. Obtain has a %% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use %% the entire limit, but will be evicted by obtain calls up to the %% point at which no more obtain calls can be satisfied by the obtains @@ -136,8 +136,8 @@ %% as sockets can do so in such a way that the overall number of open %% file descriptors is managed. %% -%% The callers of register_callback/3, obtain/0, and the argument of -%% transfer/1 are monitored, reducing the count of handles in use +%% The callers of register_callback/3, obtain, and the argument of +%% transfer are monitored, reducing the count of handles in use %% appropriately when the processes terminate. -behaviour(gen_server2). @@ -146,7 +146,8 @@ -export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2, truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, +-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, + set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). @@ -251,8 +252,11 @@ -spec(clear/1 :: (ref()) -> ok_or_error()). -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(obtain/0 :: () -> 'ok'). +-spec(obtain/1 :: (non_neg_integer()) -> 'ok'). -spec(release/0 :: () -> 'ok'). +-spec(release/1 :: (non_neg_integer()) -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). +-spec(transfer/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -485,18 +489,22 @@ set_maximum_since_use(MaximumAge) -> true -> ok end. -obtain() -> +obtain() -> obtain(1). +release() -> release(1). +transfer(Pid) -> transfer(Pid, 1). + +obtain(Count) when Count > 0 -> %% If the FHC isn't running, obtains succeed immediately. case whereis(?SERVER) of undefined -> ok; - _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity) + _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity) end. -release() -> - gen_server2:cast(?SERVER, {release, self()}). +release(Count) when Count > 0 -> + gen_server2:cast(?SERVER, {release, Count, self()}). -transfer(Pid) -> - gen_server2:cast(?SERVER, {transfer, self(), Pid}). +transfer(Pid, Count) when Count > 0 -> + gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}). set_limit(Limit) -> gen_server2:call(?SERVER, {set_limit, Limit}, infinity). @@ -842,7 +850,7 @@ init([AlarmSet, AlarmClear]) -> prioritise_cast(Msg, _State) -> case Msg of - {release, _} -> 5; + {release, _, _} -> 5; _ -> 0 end. @@ -875,11 +883,12 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, false -> {noreply, run_pending_item(Item, State)} end; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, - obtain_pending = Pending, - clients = Clients }) -> +handle_call({obtain, N, Pid}, From, State = #fhc_state { + obtain_count = Count, + obtain_pending = Pending, + clients = Clients }) -> ok = track_client(Pid, Clients), - Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + Item = #pending { kind = obtain, pid = Pid, requested = N, from = From }, Enqueue = fun () -> true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), @@ -890,7 +899,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, case obtain_limit_reached(State) of true -> Enqueue(); false -> case needs_reduce(State #fhc_state { - obtain_count = Count + 1 }) of + obtain_count = Count + N }) of true -> reduce(Enqueue()); false -> adjust_alarm( State, run_pending_item(Item, State)) @@ -925,9 +934,9 @@ handle_cast({update, Pid, EldestUnusedSince}, %% storm of messages {noreply, State}; -handle_cast({release, Pid}, State) -> +handle_cast({release, N, Pid}, State) -> {noreply, adjust_alarm(State, process_pending( - update_counts(obtain, Pid, -1, State)))}; + update_counts(obtain, Pid, -N, State)))}; handle_cast({close, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, clients = Clients }) -> @@ -939,11 +948,11 @@ handle_cast({close, Pid, EldestUnusedSince}, {noreply, adjust_alarm(State, process_pending( update_counts(open, Pid, -1, State)))}; -handle_cast({transfer, FromPid, ToPid}, State) -> +handle_cast({transfer, N, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), {noreply, process_pending( - update_counts(obtain, ToPid, +1, - update_counts(obtain, FromPid, -1, State)))}. + update_counts(obtain, ToPid, +N, + update_counts(obtain, FromPid, -N, State)))}. handle_info(check_counts, State) -> {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; @@ -77,9 +77,13 @@ %% confirmed_broadcast/2 directly from the callback module otherwise %% you will deadlock the entire group. %% -%% group_members/1 -%% Provide the Pid. Returns a list of the current group members. +%% info/1 +%% Provide the Pid. Returns a proplist with various facts, including +%% the group name and the current group members. %% +%% forget_group/1 +%% Provide the group name. Removes its mnesia record. Makes no attempt +%% to ensure the group is empty. %% %% Implementation Overview %% ----------------------- @@ -373,7 +377,7 @@ -behaviour(gen_server2). -export([create_tables/0, start_link/3, leave/1, broadcast/2, - confirmed_broadcast/2, group_members/1]). + confirmed_broadcast/2, info/1, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/2]). @@ -431,7 +435,8 @@ -spec(leave/1 :: (pid()) -> 'ok'). -spec(broadcast/2 :: (pid(), any()) -> 'ok'). -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). --spec(group_members/1 :: (pid()) -> [pid()]). +-spec(info/1 :: (pid()) -> rabbit_types:infos()). +-spec(forget_group/1 :: (group_name()) -> 'ok'). %% The joined, members_changed and handle_msg callbacks can all return %% any of the following terms: @@ -514,9 +519,15 @@ broadcast(Server, Msg) -> confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). -group_members(Server) -> - gen_server2:call(Server, group_members, infinity). +info(Server) -> + gen_server2:call(Server, info, infinity). +forget_group(GroupName) -> + {atomic, ok} = mnesia:sync_transaction( + fun () -> + mnesia:delete({?GROUP_TABLE, GroupName}) + end), + ok. init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), @@ -553,12 +564,16 @@ handle_call({confirmed_broadcast, Msg}, _From, handle_call({confirmed_broadcast, Msg}, From, State) -> internal_broadcast(Msg, From, State); -handle_call(group_members, _From, +handle_call(info, _From, State = #state { members_state = undefined }) -> reply(not_joined, State); -handle_call(group_members, _From, State = #state { view = View }) -> - reply(get_pids(alive_view_members(View)), State); +handle_call(info, _From, State = #state { group_name = GroupName, + module = Module, + view = View }) -> + reply([{group_name, GroupName}, + {module, Module}, + {group_members, get_pids(alive_view_members(View))}], State); handle_call({add_on_right, _NewMember}, _From, State = #state { members_state = undefined }) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 20ba4574..e647627c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -560,18 +560,15 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, end. deliver_or_enqueue(Delivery = #delivery{message = Message, - msg_seq_no = MsgSeqNo, sender = SenderPid}, State) -> Confirm = should_confirm_message(Delivery, State), case attempt_delivery(Delivery, Confirm, State) of {true, State1} -> maybe_record_confirm_message(Confirm, State1); - %% the next two are optimisations + %% the next one is an optimisations + %% TODO: optimise the Confirm =/= never case too {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never -> discard_delivery(Delivery, State1); - {false, State1 = #q{ttl = 0, dlx = undefined}} -> - rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), - discard_delivery(Delivery, State1); {false, State1} -> State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl index e89951e7..c9475efd 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_auth_backend.erl @@ -20,7 +20,7 @@ %% A description proplist as with auth mechanisms, %% exchanges. Currently unused. --callback description() -> [proplist:property()]. +-callback description() -> [proplists:property()]. %% Check a user can log in, given a username and a proplist of %% authentication information (e.g. [{password, Password}]). diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index eda6a743..c7d74dc3 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -19,7 +19,7 @@ -ifdef(use_specs). %% A description. --callback description() -> [proplist:property()]. +-callback description() -> [proplists:property()]. %% If this mechanism is enabled, should it be offered for a given socket? %% (primarily so EXTERNAL can be SSL-only) diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index b40ceda9..08819427 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -31,7 +31,7 @@ -type(tx() :: 'transaction' | 'none'). -type(serial() :: pos_integer() | tx()). --callback description() -> [proplist:property()]. +-callback description() -> [proplists:property()]. %% Should Rabbit ensure that all binding events that are %% delivered to an individual exchange can be serialised? (they diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 9a793aab..c5583ffd 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -21,7 +21,7 @@ -type(tx() :: 'transaction' | 'none'). -type(serial() :: pos_integer() | tx()). --callback description() -> [proplist:property()]. +-callback description() -> [proplists:property()]. %% Should Rabbit ensure that all binding events that are %% delivered to an individual exchange can be serialised? (they diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index a95f8f26..26f74796 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -105,9 +105,9 @@ with_fhc_handle(Fun) -> with_fhc_handle(1, Fun). with_fhc_handle(N, Fun) -> - [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)], + ok = file_handle_cache:obtain(N), try Fun() - after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)] + after ok = file_handle_cache:release(N) end. read_term_file(File) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index fb9f7e34..c11a8ff7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -127,10 +127,13 @@ terminate(Reason, delete_and_terminate(Reason, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()], + Info = gm:info(GM), + Slaves = [Pid || Pid <- proplists:get_value(group_members, Info), + node(Pid) =/= node()], MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), monitor_wait(MRefs), + ok = gm:forget_group(proplists:get_value(group_name, Info)), State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a926a9c4..991f8c72 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -402,8 +402,8 @@ mnesia_nodes() -> end. cluster_status(WhichNodes) -> - %% I don't want to call `running_nodes/1' unless if necessary, - %% since it can deadlock when stopping applications. + %% I don't want to call `running_nodes/1' unless if necessary, since it's + %% pretty expensive. {AllNodes1, DiscNodes1, RunningNodesThunk} = case mnesia_nodes() of {ok, {AllNodes, DiscNodes}} -> @@ -411,10 +411,9 @@ cluster_status(WhichNodes) -> {error, _Reason} -> {AllNodes, DiscNodes, RunningNodes} = rabbit_node_monitor:read_cluster_status(), - %% The cluster status file records the status when the - %% node is online, but we know for sure that the node - %% is offline now, so we can remove it from the list - %% of running nodes. + %% The cluster status file records the status when the node is + %% online, but we know for sure that the node is offline now, so + %% we can remove it from the list of running nodes. {AllNodes, DiscNodes, fun() -> nodes_excl_me(RunningNodes) end} end, case WhichNodes of @@ -1028,18 +1027,15 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -%% What we really want is nodes running rabbit, not running -%% mnesia. Using `mnesia:system_info(running_db_nodes)' will -%% return false positives when we are actually just doing cluster -%% operations (e.g. joining the cluster). +%% We're not using `mnesia:system_info(running_db_nodes)' directly because if +%% the node is a RAM node it won't know about other nodes when mnesia is stopped running_nodes(Nodes) -> {Replies, _BadNodes} = rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []), [Node || {Running, Node} <- Replies, Running]. is_running_remote() -> - {proplists:is_defined(rabbit, application:which_applications(infinity)), - node()}. + {mnesia:system_info(is_running) =:= yes, node()}. check_consistency(OTP, Rabbit) -> rabbit_misc:sequence_error( @@ -1080,16 +1076,17 @@ check_rabbit_consistency(Remote) -> %% mnesia tables aren't there because restarted RAM nodes won't have %% tables while still being non-virgin. What we do instead is to %% check if the mnesia directory is non existant or empty, with the -%% exception of the cluster status file, which will be there thanks to +%% exception of the cluster status files, which will be there thanks to %% `rabbit_node_monitor:prepare_cluster_status_file/0'. is_virgin_node() -> case rabbit_file:list_dir(dir()) of {error, enoent} -> true; - {ok, []} -> true; - {ok, [File]} -> (dir() ++ "/" ++ File) =:= - [rabbit_node_monitor:cluster_status_filename(), - rabbit_node_monitor:running_nodes_filename()]; - {ok, _} -> false + {ok, []} -> true; + {ok, [File1, File2]} -> + lists:usort([dir() ++ "/" ++ File1, dir() ++ "/" ++ File2]) =:= + lists:usort([rabbit_node_monitor:cluster_status_filename(), + rabbit_node_monitor:running_nodes_filename()]); + {ok, _} -> false end. find_good_node([]) -> |