summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-10 14:54:18 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-10 14:54:18 +0000
commit58bd26a768277206f24901167850bc9c79c9f106 (patch)
tree450676422714cbb6bcbb2eeec20bf4f911b5f9ca
parent97a842805979bdea82dcd88e3f043c5b19052e3c (diff)
parentfe1201a959e3cebd642f04d4e30e0274a28cba4f (diff)
downloadrabbitmq-server-58bd26a768277206f24901167850bc9c79c9f106.tar.gz
Merge in default
-rw-r--r--src/gen_server2.erl27
-rw-r--r--src/gm.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_channel_sup.erl20
-rw-r--r--src/rabbit_connection_helper_sup.erl9
-rw-r--r--src/rabbit_heartbeat.erl55
-rw-r--r--src/rabbit_limiter.erl8
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_mirror_queue_sync.erl12
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_net.erl7
-rw-r--r--src/rabbit_queue_collector.erl17
-rw-r--r--src/rabbit_reader.erl11
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_writer.erl49
19 files changed, 158 insertions, 92 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 6690d181..b0212e47 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -81,6 +81,10 @@
%% process as sys:get_status/1 would). Pass through a function which
%% can be invoked on the state, get back the result. The state is not
%% modified.
+%%
+%% 10) The Options parameter to start / start_link can include
+%% {proc_name, Name}. This name is stored in the process dictionary
+%% for later debugging.
%% All modifications are (C) 2009-2013 GoPivotal, Inc.
@@ -283,7 +287,7 @@ behaviour_info(_Other) ->
%%% Name ::= {local, atom()} | {global, atom()}
%%% Mod ::= atom(), callback module implementing the 'real' server
%%% Args ::= term(), init arguments (to Mod:init/1)
-%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
+%%% Options ::= [{timeout, Timeout} | {debug, [Flag]} | {proc_name, term()}]
%%% Flag ::= trace | log | {logfile, File} | statistics | debug
%%% (debug == log && statistics)
%%% Returns: {ok, Pid} |
@@ -463,6 +467,10 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
mod = Mod,
queue = Queue,
debug = Debug }),
+ case opt(proc_name, Options) of
+ {ok, ProcName} -> put(process_name, {Mod, ProcName});
+ false -> ok
+ end,
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
@@ -901,9 +909,17 @@ common_noreply(_Name, _NState, [] = _Debug) ->
common_noreply(Name, NState, Debug) ->
sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}).
-common_become(_Name, _Mod, _NState, [] = _Debug) ->
+
+common_become(Name, OldMod, NewMod, NState, Debug) ->
+ case get(process_name) of
+ {OldMod, ProcName} -> put(process_name, {NewMod, ProcName});
+ _ -> ok
+ end,
+ common_become0(Name, NewMod, NState, Debug).
+
+common_become0(_Name, _Mod, _NState, [] = _Debug) ->
[];
-common_become(Name, Mod, NState, Debug) ->
+common_become0(Name, Mod, NState, Debug) ->
sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}).
handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
@@ -935,6 +951,7 @@ handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) ->
handle_common_reply(Reply, Msg, GS2State).
handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
+ mod = Mod0,
debug = Debug}) ->
case Reply of
{noreply, NState} ->
@@ -948,14 +965,14 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
time = Time1,
debug = Debug1});
{become, Mod, NState} ->
- Debug1 = common_become(Name, Mod, NState, Debug),
+ Debug1 = common_become(Name, Mod0, Mod, NState, Debug),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
time = infinity,
debug = Debug1 }));
{become, Mod, NState, Time1} ->
- Debug1 = common_become(Name, Mod, NState, Debug),
+ Debug1 = common_become(Name, Mod0, Mod, NState, Debug),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
diff --git a/src/gm.erl b/src/gm.erl
index cddb2a3b..25cfad9b 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -517,7 +517,8 @@ table_definitions() ->
[{Name, [?TABLE_MATCH | Attributes]}].
start_link(GroupName, Module, Args, TxnFun) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
+ gen_server2:start_link(
+ ?MODULE, [GroupName, Module, Args, TxnFun], [{proc_name, GroupName}]).
leave(Server) ->
gen_server2:cast(Server, leave).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e5c283d0..9cb5d6d0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -99,7 +99,8 @@
%%----------------------------------------------------------------------------
-start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) ->
+ gen_server2:start_link(?MODULE, Q, [{proc_name, Q#amqqueue.name}]).
info_keys() -> ?INFO_KEYS.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fe9faf86..0c786c07 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -119,7 +119,8 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, CollectorPid, Limiter) ->
gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, CollectorPid, Limiter], []).
+ User, VHost, Capabilities, CollectorPid, Limiter],
+ [{proc_name, {ConnName, Channel}}]).
do(Pid, Method) ->
do(Pid, Method, none).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index df2e80ca..26f9700e 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -47,9 +47,9 @@
start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE,
- {tcp, Sock, Channel, FrameMax,
- ReaderPid, Protocol}),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {tcp, Sock, Channel, FrameMax,
+ ReaderPid, Protocol, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
[WriterPid] = supervisor2:find_child(SupPid, writer),
{ok, ChannelPid} =
@@ -64,7 +64,8 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE, direct),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {direct, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
{ok, ChannelPid} =
supervisor2:start_child(
@@ -81,10 +82,11 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
init(Type) ->
{ok, {{one_for_all, 0, 1}, child_specs(Type)}}.
-child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) ->
+child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) ->
[{writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol, ReaderPid, true]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)];
-child_specs(direct) ->
- [{limiter, {rabbit_limiter, start_link, []},
+ [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}
+ | child_specs({direct, Identity})];
+child_specs({direct, Identity}) ->
+ [{limiter, {rabbit_limiter, start_link, [Identity]},
transient, ?MAX_WAIT, worker, [rabbit_limiter]}].
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl
index e51615e8..f268d8d6 100644
--- a/src/rabbit_connection_helper_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -20,7 +20,7 @@
-export([start_link/0]).
-export([start_channel_sup_sup/1,
- start_queue_collector/1]).
+ start_queue_collector/2]).
-export([init/1]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
--spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_collector/2 :: (pid(), rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
@@ -45,10 +46,10 @@ start_channel_sup_sup(SupPid) ->
{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
-start_queue_collector(SupPid) ->
+start_queue_collector(SupPid, Identity) ->
supervisor2:start_child(
SupPid,
- {collector, {rabbit_queue_collector, start_link, []},
+ {collector, {rabbit_queue_collector, start_link, [Identity]},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ca67254b..ff9de67a 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -16,8 +16,8 @@
-module(rabbit_heartbeat).
--export([start/6]).
--export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+-export([start/6, start/7]).
+-export([start_heartbeat_sender/4, start_heartbeat_receiver/4,
pause_monitor/1, resume_monitor/1]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -39,12 +39,17 @@
non_neg_integer(), heartbeat_callback(),
non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
--spec(start_heartbeat_sender/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
--spec(start_heartbeat_receiver/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
+-spec(start/7 ::
+ (pid(), rabbit_net:socket(), rabbit_types:proc_name(),
+ non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
+
+-spec(start_heartbeat_sender/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())).
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -56,31 +61,35 @@
-endif.
%%----------------------------------------------------------------------------
-
start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ start(SupPid, Sock, unknown,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun).
+
+start(SupPid, Sock, Identity,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
{ok, Sender} =
start_heartbeater(SendTimeoutSec, SupPid, Sock,
SendFun, heartbeat_sender,
- start_heartbeat_sender),
+ start_heartbeat_sender, Identity),
{ok, Receiver} =
start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
ReceiveFun, heartbeat_receiver,
- start_heartbeat_receiver),
+ start_heartbeat_receiver, Identity),
{Sender, Receiver}.
-start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
+start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () -> SendFun(), continue end}).
+ fun () -> SendFun(), continue end}, Identity).
-start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
+start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
- fun () -> ReceiveFun(), stop end}).
+ fun () -> ReceiveFun(), stop end}, Identity).
pause_monitor({_Sender, none}) -> ok;
pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
@@ -98,17 +107,23 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
%%----------------------------------------------------------------------------
-start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback,
+ _Identity) ->
{ok, none};
-start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback,
+ Identity) ->
supervisor2:start_child(
SupPid, {Name,
- {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]},
+ {rabbit_heartbeat, Callback,
+ [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
-heartbeater(Params) ->
+heartbeater(Params, Identity) ->
Deb = sys:debug_options([]),
- {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}.
+ {ok, proc_lib:spawn_link(fun () ->
+ rabbit_misc:store_proc_name(Identity),
+ heartbeater(Params, Deb, {0, 0})
+ end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
Deb, {StatVal, SameCount} = State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d5cfbce6..fb81dbf5 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -119,7 +119,7 @@
-behaviour(gen_server2).
--export([start_link/0]).
+-export([start_link/1]).
%% channel API
-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
is_prefetch_limited/1, is_blocked/1, is_active/1,
@@ -145,7 +145,8 @@
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
@@ -193,7 +194,8 @@
%% API
%%----------------------------------------------------------------------------
-start_link() -> gen_server2:start_link(?MODULE, [], []).
+start_link(ProcName) ->
+ gen_server2:start_link(?MODULE, [], [{proc_name, ProcName}]).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index a0e8bcc6..0a034b6d 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -310,7 +310,8 @@
%%----------------------------------------------------------------------------
start_link(Queue, GM, DeathFun, DepthFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []).
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun],
+ [{proc_name, Queue#amqqueue.name}]).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index d9cef642..4f50e1a5 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -145,7 +145,7 @@ sync_mirrors(HandleInfo, EmitStats,
Log("~p messages to synchronise", [BQ:len(BQS)]),
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
Ref = make_ref(),
- Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 96f89ecc..3027f62b 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -71,7 +71,8 @@
%%----------------------------------------------------------------------------
-start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(
+ ?MODULE, Q, [{proc_name, Q#amqqueue.name}]).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 61e90105..b4c409b2 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/3, master_go/7, slave/7]).
+-export([master_prepare/4, master_go/7, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -61,7 +61,8 @@
-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
bqs()}).
--spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
+-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(),
+ log_fun(), [pid()]) -> pid()).
-spec(master_go/7 :: (pid(), reference(), log_fun(),
rabbit_mirror_queue_master:stats_fun(),
rabbit_mirror_queue_master:stats_fun(),
@@ -80,9 +81,12 @@
%% ---------------------------------------------------------------------------
%% Master
-master_prepare(Ref, Log, SPids) ->
+master_prepare(Ref, QName, Log, SPids) ->
MPid = self(),
- spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
+ spawn_link(fun () ->
+ rabbit_misc:store_proc_name(?MODULE, QName),
+ syncer(Ref, Log, MPid, SPids)
+ end).
master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 00c4eaf3..80e160d9 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -70,6 +70,7 @@
-export([interval_operation/4]).
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
+-export([store_proc_name/1, store_proc_name/2]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -248,6 +249,8 @@
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
-spec(get_parent/0 :: () -> pid()).
+-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
+-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
-endif.
%%----------------------------------------------------------------------------
@@ -1082,6 +1085,9 @@ stop_timer(State, Idx) ->
end
end.
+store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
+store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index e8c96818..401b8ab1 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -222,10 +222,9 @@ maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
maybe_ntoab(Host) -> Host.
rdns(Addr) ->
- {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups),
- case Lookup of
- true -> list_to_binary(rabbit_networking:tcp_host(Addr));
- _ -> Addr
+ case application:get_env(rabbit, reverse_dns_lookups) of
+ {ok, true} -> list_to_binary(rabbit_networking:tcp_host(Addr));
+ _ -> Addr
end.
sock_funs(inbound) -> {fun peername/1, fun sockname/1};
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 6406f7e9..1620f89a 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -16,9 +16,9 @@
-module(rabbit_queue_collector).
--behaviour(gen_server).
+-behaviour(gen_server2).
--export([start_link/0, register/2, delete_all/1]).
+-export([start_link/1, register/2, delete_all/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (pid(), pid()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
@@ -39,14 +40,14 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
+start_link(ProcName) ->
+ gen_server2:start_link(?MODULE, [], [{proc_name, ProcName}]).
register(CollectorPid, Q) ->
- gen_server:call(CollectorPid, {register, Q}, infinity).
+ gen_server2:call(CollectorPid, {register, Q}, infinity).
delete_all(CollectorPid) ->
- gen_server:call(CollectorPid, delete_all, infinity).
+ gen_server2:call(CollectorPid, delete_all, infinity).
%%----------------------------------------------------------------------------
@@ -78,7 +79,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason},
State = #state{monitors = QMons, delete_from = Deleting}) ->
QMons1 = pmon:erase(DownPid, QMons),
case Deleting =/= undefined andalso pmon:is_empty(QMons1) of
- true -> gen_server:reply(Deleting, ok);
+ true -> gen_server2:reply(Deleting, ok);
false -> ok
end,
{noreply, State#state{monitors = QMons1}}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index d9879f1b..6d70f1bd 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -214,6 +214,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
+ rabbit_misc:store_proc_name(?MODULE, list_to_binary(Name)),
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -877,15 +878,15 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
frame_max, ?FRAME_MIN_SIZE, FrameMax),
ok = validate_negotiated_integer_value(
channel_max, ?CHANNEL_MIN, ChannelMax),
- {ok, Collector} =
- rabbit_connection_helper_sup:start_queue_collector(SupPid),
+ {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(
+ SupPid, Connection#connection.name),
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater =
- rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
- SendFun, ClientHeartbeat, ReceiveFun),
+ Heartbeater = rabbit_heartbeat:start(
+ SupPid, Sock, Connection#connection.name,
+ ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
frame_max = FrameMax,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5fe319d3..054db8ae 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1262,7 +1262,7 @@ test_writer(Pid) ->
test_channel() ->
Me = self(),
Writer = spawn(fun () -> test_writer(Me) end),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1,
user(<<"guest">>), <<"/">>, [], Me, Limiter),
@@ -2815,7 +2815,7 @@ test_queue_recover() ->
end,
rabbit_amqqueue:stop(),
rabbit_amqqueue:start(rabbit_amqqueue:recover()),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
@@ -2842,7 +2842,7 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index a36613db..0edebff1 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -30,7 +30,8 @@
connection/0, protocol/0, user/0, internal_user/0,
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
- channel_exit/0, connection_exit/0, mfargs/0]).
+ channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
+ proc_type_and_name/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +157,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
+-type(proc_name() :: term()).
+-type(proc_type_and_name() :: {atom(), proc_name()}).
+
-endif. % use_specs
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 34dd3d3b..64ddb092 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, start/6, start_link/6]).
+-export([start/6, start_link/6, start/7, start_link/7]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -30,7 +30,7 @@
-export([internal_send_command/4, internal_send_command/6]).
%% internal
--export([mainloop/2, mainloop1/2]).
+-export([enter_mainloop/2, mainloop/2, mainloop1/2]).
-record(wstate, {sock, channel, frame_max, protocol, reader,
stats_timer, pending}).
@@ -41,21 +41,25 @@
-ifdef(use_specs).
--spec(start/5 ::
+-spec(start/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name())
-> rabbit_types:ok(pid())).
--spec(start_link/5 ::
+-spec(start_link/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name())
-> rabbit_types:ok(pid())).
--spec(start/6 ::
+-spec(start/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name(), boolean())
-> rabbit_types:ok(pid())).
--spec(start_link/6 ::
+-spec(start_link/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name(), boolean())
-> rabbit_types:ok(pid())).
-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
@@ -99,23 +103,23 @@
%%---------------------------------------------------------------------------
-start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- start(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) ->
+ start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false).
-start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) ->
+ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false).
-start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
+ ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- Deb = sys:debug_options([]),
- {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}.
+ {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
-start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
+ ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- Deb = sys:debug_options([]),
- {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}.
+ {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}.
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
(case ReaderWantsStats of
@@ -138,6 +142,11 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+enter_mainloop(Identity, State) ->
+ Deb = sys:debug_options([]),
+ rabbit_misc:store_proc_name(?MODULE, Identity),
+ mainloop(Deb, State).
+
mainloop(Deb, State) ->
try
mainloop1(Deb, State)