diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-29 16:02:48 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-29 16:02:48 +0100 |
commit | 302f0438a53f60cdb1760de76915d227e30bc9cc (patch) | |
tree | 7b4d248f97f11331d7fbb9a5b70df4ca87cb0efa | |
parent | d568194617e033749eeb757dd62c60127037cee0 (diff) | |
parent | 2a9af306d4afe7a3cbc83e8a0f0ae8b0b0a72487 (diff) | |
download | rabbitmq-server-302f0438a53f60cdb1760de76915d227e30bc9cc.tar.gz |
Merge bug25670
-rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
-rw-r--r-- | packaging/standalone/src/rabbit_release.erl | 4 | ||||
-rw-r--r-- | src/delegate.erl | 89 | ||||
-rw-r--r-- | src/gm.erl | 4 | ||||
-rw-r--r-- | src/pmon.erl | 42 | ||||
-rw-r--r-- | src/priority_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 10 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 3 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 3 |
10 files changed, 116 insertions, 46 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 635869a2..a4582e2d 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -68,5 +68,6 @@ rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, ssl_connection, tls_connection, ssl_record, tls_record, - gen_fsm, ssl]} + gen_fsm, ssl]}, + {ssl_apps, [asn1, crypto, public_key, ssl]} ]}]}. diff --git a/packaging/standalone/src/rabbit_release.erl b/packaging/standalone/src/rabbit_release.erl index 26f36d68..9473cbda 100644 --- a/packaging/standalone/src/rabbit_release.erl +++ b/packaging/standalone/src/rabbit_release.erl @@ -54,7 +54,9 @@ start() -> end, %% we need a list of ERTS apps we need to ship with rabbit - BaseApps = AllApps -- PluginAppNames, + {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), + + BaseApps = SslAppsConfig ++ AllApps -- PluginAppNames, AppVersions = [determine_version(App) || App <- BaseApps], RabbitVersion = proplists:get_value(rabbit, AppVersions), diff --git a/src/delegate.erl b/src/delegate.erl index e833b819..30ee33b6 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,15 +18,22 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]). +-export([start_link/1, invoke_no_result/2, invoke/2, monitor/2, + demonitor/1, demonitor/2, call/2, cast/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(state, {node, monitors, name}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([monitor_ref/0]). + +-type(monitor_ref() :: reference() | {atom(), pid()}). + -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). -spec(invoke/2 :: @@ -35,6 +42,10 @@ [{pid(), term()}]}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(monitor/2 :: ('process', pid()) -> monitor_ref()). +-spec(demonitor/1 :: (monitor_ref()) -> 'true'). +-spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true'). + -spec(call/2 :: ( pid(), any()) -> any(); ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}). @@ -50,7 +61,8 @@ %%---------------------------------------------------------------------------- start_link(Num) -> - gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). + Name = delegate_name(Num), + gen_server2:start_link({local, Name}, ?MODULE, [Name], []). invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> Fun(Pid); @@ -78,7 +90,7 @@ invoke(Pids, Fun) when is_list(Pids) -> case orddict:fetch_keys(Grouped) of [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( - RemoteNodes, delegate(RemoteNodes), + RemoteNodes, delegate(self(), RemoteNodes), {invoke, Fun, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || @@ -106,12 +118,27 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), - {invoke, Fun, Grouped}) + RemoteNodes -> gen_server2:abcast( + RemoteNodes, delegate(self(), RemoteNodes), + {invoke, Fun, Grouped}) end, safe_invoke(LocalPids, Fun), %% must not die ok. +monitor(Type, Pid) when node(Pid) =:= node() -> + erlang:monitor(Type, Pid); +monitor(Type, Pid) -> + Name = delegate(Pid, [node(Pid)]), + gen_server2:cast(Name, {monitor, Type, self(), Pid}), + {Name, Pid}. + +demonitor(Ref) -> ?MODULE:demonitor(Ref, []). + +demonitor(Ref, Options) when is_reference(Ref) -> + erlang:demonitor(Ref, Options); +demonitor({Name, Pid}, Options) -> + gen_server2:cast(Name, {demonitor, Pid, Options}). + call(PidOrPids, Msg) -> invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). @@ -134,10 +161,10 @@ group_pids_by_node(Pids) -> delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate(RemoteNodes) -> +delegate(Pid, RemoteNodes) -> case get(delegate) of undefined -> Name = delegate_name( - erlang:phash2(self(), + erlang:phash2(Pid, delegate_sup:count(RemoteNodes))), put(delegate, Name), Name; @@ -155,22 +182,48 @@ safe_invoke(Pid, Fun) when is_pid(Pid) -> %%---------------------------------------------------------------------------- -init([]) -> - {ok, node(), hibernate, +init([Name]) -> + {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, Node) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. - -handle_cast({invoke, Fun, Grouped}, Node) -> +handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}. + +handle_cast({monitor, Type, WantsMonitor, Pid}, + State = #state{monitors = Monitors}) -> + Ref = erlang:monitor(Type, Pid), + Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors), + {noreply, State#state{monitors = Monitors1}, hibernate}; + +handle_cast({demonitor, Pid, Options}, + State = #state{monitors = Monitors}) -> + {noreply, case dict:find(Pid, Monitors) of + {ok, {_WantsMonitor, Ref}} -> + erlang:demonitor(Ref, Options), + State#state{monitors = dict:erase(Pid, Monitors)}; + error -> + State + end, hibernate}; + +handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> safe_invoke(orddict:fetch(Node, Grouped), Fun), - {noreply, Node, hibernate}. + {noreply, State, hibernate}. + +handle_info({'DOWN', Ref, process, Pid, Info}, + State = #state{monitors = Monitors, name = Name}) -> + {noreply, case dict:find(Pid, Monitors) of + {ok, {WantsMonitor, Ref}} -> + WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info}, + State#state{monitors = dict:erase(Pid, Monitors)}; + error -> + State + end, hibernate}; -handle_info(_Info, Node) -> - {noreply, Node, hibernate}. +handle_info(_Info, State) -> + {noreply, State, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, Node, _Extra) -> - {ok, Node}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. @@ -1076,7 +1076,7 @@ prune_or_create_group(Self, GroupName, TxnFun) -> fun () -> GroupNew = #gm_group { name = GroupName, members = [Self], - version = ?VERSION_START }, + version = get_version(Self) }, case mnesia:read({?GROUP_TABLE, GroupName}) of [] -> mnesia:write(GroupNew), @@ -1317,6 +1317,8 @@ remove_erased_members(MembersState, View) -> MembersState1) end, blank_member_state(), all_known_members(View)). +get_version({Version, _Pid}) -> Version. + get_pid({_Version, Pid}) -> Pid. get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. diff --git a/src/pmon.erl b/src/pmon.erl index ed32b8b2..1e31eb60 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -16,22 +16,26 @@ -module(pmon). --export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, - monitored/1, is_empty/1]). +-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2, + is_monitored/2, erase/2, monitored/1, is_empty/1]). -compile({no_auto_import, [monitor/2]}). +-record(state, {dict, module}). + -ifdef(use_specs). %%---------------------------------------------------------------------------- -export_type([?MODULE/0]). --opaque(?MODULE() :: dict()). +-opaque(?MODULE() :: #state{dict :: dict(), + module :: atom()}). -type(item() :: pid() | {atom(), node()}). -spec(new/0 :: () -> ?MODULE()). +-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()). -spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()). -spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). -spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). @@ -42,29 +46,33 @@ -endif. -new() -> dict:new(). +new() -> new(erlang). + +new(Module) -> #state{dict = dict:new(), + module = Module}. -monitor(Item, M) -> +monitor(Item, S = #state{dict = M, module = Module}) -> case dict:is_key(Item, M) of - true -> M; - false -> dict:store(Item, erlang:monitor(process, Item), M) + true -> S; + false -> S#state{dict = dict:store( + Item, Module:monitor(process, Item), M)} end. -monitor_all([], M) -> M; %% optimisation -monitor_all([Item], M) -> monitor(Item, M); %% optimisation -monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). +monitor_all([], S) -> S; %% optimisation +monitor_all([Item], S) -> monitor(Item, S); %% optimisation +monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items). -demonitor(Item, M) -> +demonitor(Item, S = #state{dict = M, module = Module}) -> case dict:find(Item, M) of - {ok, MRef} -> erlang:demonitor(MRef), - dict:erase(Item, M); + {ok, MRef} -> Module:demonitor(MRef), + S#state{dict = dict:erase(Item, M)}; error -> M end. -is_monitored(Item, M) -> dict:is_key(Item, M). +is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M). -erase(Item, M) -> dict:erase(Item, M). +erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}. -monitored(M) -> dict:fetch_keys(M). +monitored(#state{dict = M}) -> dict:fetch_keys(M). -is_empty(M) -> dict:size(M) == 0. +is_empty(#state{dict = M}) -> dict:size(M) == 0. diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 0dc19819..18e1e8d9 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -51,7 +51,7 @@ -type(q() :: pqueue()). -type(priority() :: integer() | 'infinity'). --type(squeue() :: {queue, [any()], [any()]}). +-type(squeue() :: {queue, [any()], [any()], non_neg_integer()}). -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). -spec(new/0 :: () -> pqueue()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5409a806..b30af033 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -146,7 +146,7 @@ init_state(Q) -> exclusive_consumer = none, has_had_consumers = false, active_consumers = queue:new(), - senders = pmon:new(), + senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running}, rabbit_event:init_stats_timer(State, #q.stats_timer). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 5e83e8a4..8c6e4c7a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -120,7 +120,7 @@ init(Q = #amqqueue { name = QName }) -> msg_id_ack = dict:new(), msg_id_status = dict:new(), - known_senders = pmon:new(), + known_senders = pmon:new(delegate), depth_delta = undefined }, @@ -274,7 +274,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, noreply(State); handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> - noreply(local_sender_death(ChPid, State)); + local_sender_death(ChPid, State), + noreply(State); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -605,7 +606,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = pmon:monitor(ChPid, KS) }. -local_sender_death(ChPid, State = #state { known_senders = KS }) -> +local_sender_death(ChPid, #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a delivery %% from it but not heard about its death from the master. So if it %% is monitored we need to point the death out to the master (see @@ -613,8 +614,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> confirm_sender_death(ChPid) - end, - State. + end. confirm_sender_death(Pid) -> %% We have to deal with the possibility that we'll be promoted to diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 702df040..ebbedab8 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -145,7 +145,8 @@ start() -> rabbit_sup:start_supervisor_child( {rabbit_connection_sup,start_link,[]}]). ensure_ssl() -> - ok = app_utils:start_applications([asn1, crypto, public_key, ssl]), + {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), + ok = app_utils:start_applications(SslAppsConfig), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), % unknown_ca errors are silently ignored prior to R14B unless we diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3cf88d06..0fa812c6 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -756,6 +756,9 @@ refuse_connection(Sock, Exception, {A, B, C, D}) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end), throw(Exception). +-ifdef(use_specs). +-spec(refuse_connection/2 :: (rabbit_net:socket(), any()) -> no_return()). +-endif. refuse_connection(Sock, Exception) -> refuse_connection(Sock, Exception, {0, 0, 9, 1}). |