diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-10 14:54:18 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-10 14:54:18 +0000 |
commit | 58bd26a768277206f24901167850bc9c79c9f106 (patch) | |
tree | 450676422714cbb6bcbb2eeec20bf4f911b5f9ca | |
parent | 97a842805979bdea82dcd88e3f043c5b19052e3c (diff) | |
parent | fe1201a959e3cebd642f04d4e30e0274a28cba4f (diff) | |
download | rabbitmq-server-58bd26a768277206f24901167850bc9c79c9f106.tar.gz |
Merge in default
-rw-r--r-- | src/gen_server2.erl | 27 | ||||
-rw-r--r-- | src/gm.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 20 | ||||
-rw-r--r-- | src/rabbit_connection_helper_sup.erl | 9 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 55 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 8 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 12 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_net.erl | 7 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 17 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 11 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_types.erl | 6 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 49 |
19 files changed, 158 insertions, 92 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 6690d181..b0212e47 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -81,6 +81,10 @@ %% process as sys:get_status/1 would). Pass through a function which %% can be invoked on the state, get back the result. The state is not %% modified. +%% +%% 10) The Options parameter to start / start_link can include +%% {proc_name, Name}. This name is stored in the process dictionary +%% for later debugging. %% All modifications are (C) 2009-2013 GoPivotal, Inc. @@ -283,7 +287,7 @@ behaviour_info(_Other) -> %%% Name ::= {local, atom()} | {global, atom()} %%% Mod ::= atom(), callback module implementing the 'real' server %%% Args ::= term(), init arguments (to Mod:init/1) -%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] +%%% Options ::= [{timeout, Timeout} | {debug, [Flag]} | {proc_name, term()}] %%% Flag ::= trace | log | {logfile, File} | statistics | debug %%% (debug == log && statistics) %%% Returns: {ok, Pid} | @@ -463,6 +467,10 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> mod = Mod, queue = Queue, debug = Debug }), + case opt(proc_name, Options) of + {ok, ProcName} -> put(process_name, {Mod, ProcName}); + false -> ok + end, case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), @@ -901,9 +909,17 @@ common_noreply(_Name, _NState, [] = _Debug) -> common_noreply(Name, NState, Debug) -> sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}). -common_become(_Name, _Mod, _NState, [] = _Debug) -> + +common_become(Name, OldMod, NewMod, NState, Debug) -> + case get(process_name) of + {OldMod, ProcName} -> put(process_name, {NewMod, ProcName}); + _ -> ok + end, + common_become0(Name, NewMod, NState, Debug). + +common_become0(_Name, _Mod, _NState, [] = _Debug) -> []; -common_become(Name, Mod, NState, Debug) -> +common_become0(Name, Mod, NState, Debug) -> sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}). handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, @@ -935,6 +951,7 @@ handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) -> handle_common_reply(Reply, Msg, GS2State). handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, + mod = Mod0, debug = Debug}) -> case Reply of {noreply, NState} -> @@ -948,14 +965,14 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, time = Time1, debug = Debug1}); {become, Mod, NState} -> - Debug1 = common_become(Name, Mod, NState, Debug), + Debug1 = common_become(Name, Mod0, Mod, NState, Debug), loop(find_prioritisers( GS2State #gs2_state { mod = Mod, state = NState, time = infinity, debug = Debug1 })); {become, Mod, NState, Time1} -> - Debug1 = common_become(Name, Mod, NState, Debug), + Debug1 = common_become(Name, Mod0, Mod, NState, Debug), loop(find_prioritisers( GS2State #gs2_state { mod = Mod, state = NState, @@ -517,7 +517,8 @@ table_definitions() -> [{Name, [?TABLE_MATCH | Attributes]}]. start_link(GroupName, Module, Args, TxnFun) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). + gen_server2:start_link( + ?MODULE, [GroupName, Module, Args, TxnFun], [{proc_name, GroupName}]). leave(Server) -> gen_server2:cast(Server, leave). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e5c283d0..9cb5d6d0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -99,7 +99,8 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> + gen_server2:start_link(?MODULE, Q, [{proc_name, Q#amqqueue.name}]). info_keys() -> ?INFO_KEYS. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fe9faf86..0c786c07 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -119,7 +119,8 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, Limiter) -> gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, CollectorPid, Limiter], []). + User, VHost, Capabilities, CollectorPid, Limiter], + [{proc_name, {ConnName, Channel}}]). do(Pid, Method) -> do(Pid, Method, none). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index df2e80ca..26f9700e 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -47,9 +47,9 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, - {tcp, Sock, Channel, FrameMax, - ReaderPid, Protocol}), + {ok, SupPid} = supervisor2:start_link( + ?MODULE, {tcp, Sock, Channel, FrameMax, + ReaderPid, Protocol, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), [WriterPid] = supervisor2:find_child(SupPid, writer), {ok, ChannelPid} = @@ -64,7 +64,8 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, direct), + {ok, SupPid} = supervisor2:start_link( + ?MODULE, {direct, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), {ok, ChannelPid} = supervisor2:start_child( @@ -81,10 +82,11 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, init(Type) -> {ok, {{one_for_all, 0, 1}, child_specs(Type)}}. -child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) -> +child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) -> [{writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid, true]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)]; -child_specs(direct) -> - [{limiter, {rabbit_limiter, start_link, []}, + [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} + | child_specs({direct, Identity})]; +child_specs({direct, Identity}) -> + [{limiter, {rabbit_limiter, start_link, [Identity]}, transient, ?MAX_WAIT, worker, [rabbit_limiter]}]. diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl index e51615e8..f268d8d6 100644 --- a/src/rabbit_connection_helper_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -20,7 +20,7 @@ -export([start_link/0]). -export([start_channel_sup_sup/1, - start_queue_collector/1]). + start_queue_collector/2]). -export([init/1]). @@ -31,7 +31,8 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). --spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_collector/2 :: (pid(), rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- @@ -45,10 +46,10 @@ start_channel_sup_sup(SupPid) -> {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}). -start_queue_collector(SupPid) -> +start_queue_collector(SupPid, Identity) -> supervisor2:start_child( SupPid, - {collector, {rabbit_queue_collector, start_link, []}, + {collector, {rabbit_queue_collector, start_link, [Identity]}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ca67254b..ff9de67a 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -16,8 +16,8 @@ -module(rabbit_heartbeat). --export([start/6]). --export([start_heartbeat_sender/3, start_heartbeat_receiver/3, +-export([start/6, start/7]). +-export([start_heartbeat_sender/4, start_heartbeat_receiver/4, pause_monitor/1, resume_monitor/1]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -39,12 +39,17 @@ non_neg_integer(), heartbeat_callback(), non_neg_integer(), heartbeat_callback()) -> heartbeaters()). --spec(start_heartbeat_sender/3 :: - (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> - rabbit_types:ok(pid())). --spec(start_heartbeat_receiver/3 :: - (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> - rabbit_types:ok(pid())). +-spec(start/7 :: + (pid(), rabbit_net:socket(), rabbit_types:proc_name(), + non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> heartbeaters()). + +-spec(start_heartbeat_sender/4 :: + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/4 :: + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -56,31 +61,35 @@ -endif. %%---------------------------------------------------------------------------- - start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + start(SupPid, Sock, unknown, + SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun). + +start(SupPid, Sock, Identity, + SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock, SendFun, heartbeat_sender, - start_heartbeat_sender), + start_heartbeat_sender, Identity), {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, ReceiveFun, heartbeat_receiver, - start_heartbeat_receiver), + start_heartbeat_receiver, Identity), {Sender, Receiver}. -start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> SendFun(), continue end}). + fun () -> SendFun(), continue end}, Identity). -start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> ReceiveFun(), stop end}). + fun () -> ReceiveFun(), stop end}, Identity). pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. @@ -98,17 +107,23 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. %%---------------------------------------------------------------------------- -start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback, + _Identity) -> {ok, none}; -start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback, + Identity) -> supervisor2:start_child( SupPid, {Name, - {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]}, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). -heartbeater(Params) -> +heartbeater(Params, Identity) -> Deb = sys:debug_options([]), - {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}. + {ok, proc_lib:spawn_link(fun () -> + rabbit_misc:store_proc_name(Identity), + heartbeater(Params, Deb, {0, 0}) + end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Deb, {StatVal, SameCount} = State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d5cfbce6..fb81dbf5 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -119,7 +119,7 @@ -behaviour(gen_server2). --export([start_link/0]). +-export([start_link/1]). %% channel API -export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, is_prefetch_limited/1, is_blocked/1, is_active/1, @@ -145,7 +145,8 @@ -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). -spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) @@ -193,7 +194,8 @@ %% API %%---------------------------------------------------------------------------- -start_link() -> gen_server2:start_link(?MODULE, [], []). +start_link(ProcName) -> + gen_server2:start_link(?MODULE, [], [{proc_name, ProcName}]). new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index a0e8bcc6..0a034b6d 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -310,7 +310,8 @@ %%---------------------------------------------------------------------------- start_link(Queue, GM, DeathFun, DepthFun) -> - gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []). + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], + [{proc_name, Queue#amqqueue.name}]). get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index d9cef642..4f50e1a5 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -145,7 +145,7 @@ sync_mirrors(HandleInfo, EmitStats, Log("~p messages to synchronise", [BQ:len(BQS)]), {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), Ref = make_ref(), - Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), + Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go( diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 96f89ecc..3027f62b 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -71,7 +71,8 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> gen_server2:start_link( + ?MODULE, Q, [{proc_name, Q#amqqueue.name}]). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 61e90105..b4c409b2 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/3, master_go/7, slave/7]). +-export([master_prepare/4, master_go/7, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -61,7 +61,8 @@ -type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(), bqs()}). --spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). +-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(), + log_fun(), [pid()]) -> pid()). -spec(master_go/7 :: (pid(), reference(), log_fun(), rabbit_mirror_queue_master:stats_fun(), rabbit_mirror_queue_master:stats_fun(), @@ -80,9 +81,12 @@ %% --------------------------------------------------------------------------- %% Master -master_prepare(Ref, Log, SPids) -> +master_prepare(Ref, QName, Log, SPids) -> MPid = self(), - spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). + spawn_link(fun () -> + rabbit_misc:store_proc_name(?MODULE, QName), + syncer(Ref, Log, MPid, SPids) + end). master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 00c4eaf3..80e160d9 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -70,6 +70,7 @@ -export([interval_operation/4]). -export([ensure_timer/4, stop_timer/2]). -export([get_parent/0]). +-export([store_proc_name/1, store_proc_name/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -248,6 +249,8 @@ -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). -spec(get_parent/0 :: () -> pid()). +-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). +-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). -endif. %%---------------------------------------------------------------------------- @@ -1082,6 +1085,9 @@ stop_timer(State, Idx) -> end end. +store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). +store_proc_name(TypeProcName) -> put(process_name, TypeProcName). + %% ------------------------------------------------------------------------- %% Begin copypasta from gen_server2.erl diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index e8c96818..401b8ab1 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -222,10 +222,9 @@ maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr); maybe_ntoab(Host) -> Host. rdns(Addr) -> - {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups), - case Lookup of - true -> list_to_binary(rabbit_networking:tcp_host(Addr)); - _ -> Addr + case application:get_env(rabbit, reverse_dns_lookups) of + {ok, true} -> list_to_binary(rabbit_networking:tcp_host(Addr)); + _ -> Addr end. sock_funs(inbound) -> {fun peername/1, fun sockname/1}; diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 6406f7e9..1620f89a 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -16,9 +16,9 @@ -module(rabbit_queue_collector). --behaviour(gen_server). +-behaviour(gen_server2). --export([start_link/0, register/2, delete_all/1]). +-export([start_link/1, register/2, delete_all/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -31,7 +31,8 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error()). -spec(register/2 :: (pid(), pid()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). @@ -39,14 +40,14 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link(?MODULE, [], []). +start_link(ProcName) -> + gen_server2:start_link(?MODULE, [], [{proc_name, ProcName}]). register(CollectorPid, Q) -> - gen_server:call(CollectorPid, {register, Q}, infinity). + gen_server2:call(CollectorPid, {register, Q}, infinity). delete_all(CollectorPid) -> - gen_server:call(CollectorPid, delete_all, infinity). + gen_server2:call(CollectorPid, delete_all, infinity). %%---------------------------------------------------------------------------- @@ -78,7 +79,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{monitors = QMons, delete_from = Deleting}) -> QMons1 = pmon:erase(DownPid, QMons), case Deleting =/= undefined andalso pmon:is_empty(QMons1) of - true -> gen_server:reply(Deleting, ok); + true -> gen_server2:reply(Deleting, ok); false -> ok end, {noreply, State#state{monitors = QMons1}}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d9879f1b..6d70f1bd 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -214,6 +214,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), + rabbit_misc:store_proc_name(?MODULE, list_to_binary(Name)), State = #v1{parent = Parent, sock = ClientSock, connection = #connection{ @@ -877,15 +878,15 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, frame_max, ?FRAME_MIN_SIZE, FrameMax), ok = validate_negotiated_integer_value( channel_max, ?CHANNEL_MIN, ChannelMax), - {ok, Collector} = - rabbit_connection_helper_sup:start_queue_collector(SupPid), + {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector( + SupPid, Connection#connection.name), Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = - rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, - SendFun, ClientHeartbeat, ReceiveFun), + Heartbeater = rabbit_heartbeat:start( + SupPid, Sock, Connection#connection.name, + ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ frame_max = FrameMax, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5fe319d3..054db8ae 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1262,7 +1262,7 @@ test_writer(Pid) -> test_channel() -> Me = self(), Writer = spawn(fun () -> test_writer(Me) end), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, user(<<"guest">>), <<"/">>, [], Me, Limiter), @@ -2815,7 +2815,7 @@ test_queue_recover() -> end, rabbit_amqqueue:stop(), rabbit_amqqueue:start(rabbit_amqqueue:recover()), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> @@ -2842,7 +2842,7 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:set_ram_duration_target(QPid, 0), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index a36613db..0edebff1 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -30,7 +30,8 @@ connection/0, protocol/0, user/0, internal_user/0, username/0, password/0, password_hash/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, - channel_exit/0, connection_exit/0, mfargs/0]). + channel_exit/0, connection_exit/0, mfargs/0, proc_name/0, + proc_type_and_name/0]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -156,4 +157,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). +-type(proc_name() :: term()). +-type(proc_type_and_name() :: {atom(), proc_name()}). + -endif. % use_specs diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 34dd3d3b..64ddb092 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/5, start_link/5, start/6, start_link/6]). +-export([start/6, start_link/6, start/7, start_link/7]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -30,7 +30,7 @@ -export([internal_send_command/4, internal_send_command/6]). %% internal --export([mainloop/2, mainloop1/2]). +-export([enter_mainloop/2, mainloop/2, mainloop1/2]). -record(wstate, {sock, channel, frame_max, protocol, reader, stats_timer, pending}). @@ -41,21 +41,25 @@ -ifdef(use_specs). --spec(start/5 :: +-spec(start/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name()) -> rabbit_types:ok(pid())). --spec(start_link/5 :: +-spec(start_link/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name()) -> rabbit_types:ok(pid())). --spec(start/6 :: +-spec(start/7 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name(), boolean()) -> rabbit_types:ok(pid())). --spec(start_link/6 :: +-spec(start_link/7 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name(), boolean()) -> rabbit_types:ok(pid())). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). @@ -99,23 +103,23 @@ %%--------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - start(Sock, Channel, FrameMax, Protocol, ReaderPid, false). +start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) -> + start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false). -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false). +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) -> + start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false). -start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> +start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - Deb = sys:debug_options([]), - {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}. + {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}. -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - Deb = sys:debug_options([]), - {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}. + {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}. initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> (case ReaderWantsStats of @@ -138,6 +142,11 @@ system_terminate(Reason, _Parent, _Deb, _State) -> system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. +enter_mainloop(Identity, State) -> + Deb = sys:debug_options([]), + rabbit_misc:store_proc_name(?MODULE, Identity), + mainloop(Deb, State). + mainloop(Deb, State) -> try mainloop1(Deb, State) |