summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-29 16:02:48 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-29 16:02:48 +0100
commit302f0438a53f60cdb1760de76915d227e30bc9cc (patch)
tree7b4d248f97f11331d7fbb9a5b70df4ca87cb0efa
parentd568194617e033749eeb757dd62c60127037cee0 (diff)
parent2a9af306d4afe7a3cbc83e8a0f0ae8b0b0a72487 (diff)
downloadrabbitmq-server-302f0438a53f60cdb1760de76915d227e30bc9cc.tar.gz
Merge bug25670
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--packaging/standalone/src/rabbit_release.erl4
-rw-r--r--src/delegate.erl89
-rw-r--r--src/gm.erl4
-rw-r--r--src/pmon.erl42
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl10
-rw-r--r--src/rabbit_networking.erl3
-rw-r--r--src/rabbit_reader.erl3
10 files changed, 116 insertions, 46 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 635869a2..a4582e2d 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -68,5 +68,6 @@
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow,
pmon, ssl_connection, tls_connection, ssl_record, tls_record,
- gen_fsm, ssl]}
+ gen_fsm, ssl]},
+ {ssl_apps, [asn1, crypto, public_key, ssl]}
]}]}.
diff --git a/packaging/standalone/src/rabbit_release.erl b/packaging/standalone/src/rabbit_release.erl
index 26f36d68..9473cbda 100644
--- a/packaging/standalone/src/rabbit_release.erl
+++ b/packaging/standalone/src/rabbit_release.erl
@@ -54,7 +54,9 @@ start() ->
end,
%% we need a list of ERTS apps we need to ship with rabbit
- BaseApps = AllApps -- PluginAppNames,
+ {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
+
+ BaseApps = SslAppsConfig ++ AllApps -- PluginAppNames,
AppVersions = [determine_version(App) || App <- BaseApps],
RabbitVersion = proplists:get_value(rabbit, AppVersions),
diff --git a/src/delegate.erl b/src/delegate.erl
index e833b819..30ee33b6 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,15 +18,22 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2, monitor/2,
+ demonitor/1, demonitor/2, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-record(state, {node, monitors, name}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([monitor_ref/0]).
+
+-type(monitor_ref() :: reference() | {atom(), pid()}).
+
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
-spec(invoke/2 ::
@@ -35,6 +42,10 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(monitor/2 :: ('process', pid()) -> monitor_ref()).
+-spec(demonitor/1 :: (monitor_ref()) -> 'true').
+-spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true').
+
-spec(call/2 ::
( pid(), any()) -> any();
([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
@@ -50,7 +61,8 @@
%%----------------------------------------------------------------------------
start_link(Num) ->
- gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+ Name = delegate_name(Num),
+ gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
Fun(Pid);
@@ -78,7 +90,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
- RemoteNodes, delegate(RemoteNodes),
+ RemoteNodes, delegate(self(), RemoteNodes),
{invoke, Fun, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
@@ -106,12 +118,27 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
- {invoke, Fun, Grouped})
+ RemoteNodes -> gen_server2:abcast(
+ RemoteNodes, delegate(self(), RemoteNodes),
+ {invoke, Fun, Grouped})
end,
safe_invoke(LocalPids, Fun), %% must not die
ok.
+monitor(Type, Pid) when node(Pid) =:= node() ->
+ erlang:monitor(Type, Pid);
+monitor(Type, Pid) ->
+ Name = delegate(Pid, [node(Pid)]),
+ gen_server2:cast(Name, {monitor, Type, self(), Pid}),
+ {Name, Pid}.
+
+demonitor(Ref) -> ?MODULE:demonitor(Ref, []).
+
+demonitor(Ref, Options) when is_reference(Ref) ->
+ erlang:demonitor(Ref, Options);
+demonitor({Name, Pid}, Options) ->
+ gen_server2:cast(Name, {demonitor, Pid, Options}).
+
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
@@ -134,10 +161,10 @@ group_pids_by_node(Pids) ->
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate(RemoteNodes) ->
+delegate(Pid, RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
- erlang:phash2(self(),
+ erlang:phash2(Pid,
delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
@@ -155,22 +182,48 @@ safe_invoke(Pid, Fun) when is_pid(Pid) ->
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, node(), hibernate,
+init([Name]) ->
+ {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({invoke, Fun, Grouped}, _From, Node) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-
-handle_cast({invoke, Fun, Grouped}, Node) ->
+handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}.
+
+handle_cast({monitor, Type, WantsMonitor, Pid},
+ State = #state{monitors = Monitors}) ->
+ Ref = erlang:monitor(Type, Pid),
+ Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors),
+ {noreply, State#state{monitors = Monitors1}, hibernate};
+
+handle_cast({demonitor, Pid, Options},
+ State = #state{monitors = Monitors}) ->
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {_WantsMonitor, Ref}} ->
+ erlang:demonitor(Ref, Options),
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
+
+handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),
- {noreply, Node, hibernate}.
+ {noreply, State, hibernate}.
+
+handle_info({'DOWN', Ref, process, Pid, Info},
+ State = #state{monitors = Monitors, name = Name}) ->
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {WantsMonitor, Ref}} ->
+ WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info},
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
-handle_info(_Info, Node) ->
- {noreply, Node, hibernate}.
+handle_info(_Info, State) ->
+ {noreply, State, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, Node, _Extra) ->
- {ok, Node}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/gm.erl b/src/gm.erl
index 76e769d9..7817dfff 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -1076,7 +1076,7 @@ prune_or_create_group(Self, GroupName, TxnFun) ->
fun () ->
GroupNew = #gm_group { name = GroupName,
members = [Self],
- version = ?VERSION_START },
+ version = get_version(Self) },
case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
mnesia:write(GroupNew),
@@ -1317,6 +1317,8 @@ remove_erased_members(MembersState, View) ->
MembersState1)
end, blank_member_state(), all_known_members(View)).
+get_version({Version, _Pid}) -> Version.
+
get_pid({_Version, Pid}) -> Pid.
get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
diff --git a/src/pmon.erl b/src/pmon.erl
index ed32b8b2..1e31eb60 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -16,22 +16,26 @@
-module(pmon).
--export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
- monitored/1, is_empty/1]).
+-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2,
+ is_monitored/2, erase/2, monitored/1, is_empty/1]).
-compile({no_auto_import, [monitor/2]}).
+-record(state, {dict, module}).
+
-ifdef(use_specs).
%%----------------------------------------------------------------------------
-export_type([?MODULE/0]).
--opaque(?MODULE() :: dict()).
+-opaque(?MODULE() :: #state{dict :: dict(),
+ module :: atom()}).
-type(item() :: pid() | {atom(), node()}).
-spec(new/0 :: () -> ?MODULE()).
+-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()).
-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
@@ -42,29 +46,33 @@
-endif.
-new() -> dict:new().
+new() -> new(erlang).
+
+new(Module) -> #state{dict = dict:new(),
+ module = Module}.
-monitor(Item, M) ->
+monitor(Item, S = #state{dict = M, module = Module}) ->
case dict:is_key(Item, M) of
- true -> M;
- false -> dict:store(Item, erlang:monitor(process, Item), M)
+ true -> S;
+ false -> S#state{dict = dict:store(
+ Item, Module:monitor(process, Item), M)}
end.
-monitor_all([], M) -> M; %% optimisation
-monitor_all([Item], M) -> monitor(Item, M); %% optimisation
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+monitor_all([], S) -> S; %% optimisation
+monitor_all([Item], S) -> monitor(Item, S); %% optimisation
+monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items).
-demonitor(Item, M) ->
+demonitor(Item, S = #state{dict = M, module = Module}) ->
case dict:find(Item, M) of
- {ok, MRef} -> erlang:demonitor(MRef),
- dict:erase(Item, M);
+ {ok, MRef} -> Module:demonitor(MRef),
+ S#state{dict = dict:erase(Item, M)};
error -> M
end.
-is_monitored(Item, M) -> dict:is_key(Item, M).
+is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M).
-erase(Item, M) -> dict:erase(Item, M).
+erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}.
-monitored(M) -> dict:fetch_keys(M).
+monitored(#state{dict = M}) -> dict:fetch_keys(M).
-is_empty(M) -> dict:size(M) == 0.
+is_empty(#state{dict = M}) -> dict:size(M) == 0.
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 0dc19819..18e1e8d9 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -51,7 +51,7 @@
-type(q() :: pqueue()).
-type(priority() :: integer() | 'infinity').
--type(squeue() :: {queue, [any()], [any()]}).
+-type(squeue() :: {queue, [any()], [any()], non_neg_integer()}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
-spec(new/0 :: () -> pqueue()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5409a806..b30af033 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -146,7 +146,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = queue:new(),
- senders = pmon:new(),
+ senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 5e83e8a4..8c6e4c7a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -120,7 +120,7 @@ init(Q = #amqqueue { name = QName }) ->
msg_id_ack = dict:new(),
msg_id_status = dict:new(),
- known_senders = pmon:new(),
+ known_senders = pmon:new(delegate),
depth_delta = undefined
},
@@ -274,7 +274,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
noreply(State);
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
- noreply(local_sender_death(ChPid, State));
+ local_sender_death(ChPid, State),
+ noreply(State);
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -605,7 +606,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
-local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+local_sender_death(ChPid, #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a delivery
%% from it but not heard about its death from the master. So if it
%% is monitored we need to point the death out to the master (see
@@ -613,8 +614,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> confirm_sender_death(ChPid)
- end,
- State.
+ end.
confirm_sender_death(Pid) ->
%% We have to deal with the possibility that we'll be promoted to
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 702df040..ebbedab8 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -145,7 +145,8 @@ start() -> rabbit_sup:start_supervisor_child(
{rabbit_connection_sup,start_link,[]}]).
ensure_ssl() ->
- ok = app_utils:start_applications([asn1, crypto, public_key, ssl]),
+ {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
+ ok = app_utils:start_applications(SslAppsConfig),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
% unknown_ca errors are silently ignored prior to R14B unless we
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3cf88d06..0fa812c6 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -756,6 +756,9 @@ refuse_connection(Sock, Exception, {A, B, C, D}) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
throw(Exception).
+-ifdef(use_specs).
+-spec(refuse_connection/2 :: (rabbit_net:socket(), any()) -> no_return()).
+-endif.
refuse_connection(Sock, Exception) ->
refuse_connection(Sock, Exception, {0, 0, 9, 1}).