diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-12-10 11:34:29 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-12-10 11:34:29 +0000 |
commit | b5224659d3efcee5ba57b1dc4897a91595db7e4a (patch) | |
tree | f11706d9844a30f6c0f77ed34c6638478574dd2c | |
parent | 2f797f1991a6e98cad4a0a85a9580d71542b4858 (diff) | |
download | rabbitmq-server-b5224659d3efcee5ba57b1dc4897a91595db7e4a.tar.gz |
A bit more abstraction, plus identities for heartbeater /
queue_collector / limiter.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 20 | ||||
-rw-r--r-- | src/rabbit_connection_helper_sup.erl | 9 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 55 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 12 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 10 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_types.erl | 6 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 21 |
13 files changed, 100 insertions, 65 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 049df840..2c2b747f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -124,7 +124,7 @@ info_keys() -> ?INFO_KEYS. init(Q) -> process_flag(trap_exit, true), - put(rabbit_process_name, {queue, Q#amqqueue.name}), + rabbit_misc:store_identity(queue, Q#amqqueue.name), {ok, init_state(Q#amqqueue{pid = self()}), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dbf66dcf..00ff1f43 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -195,7 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), - put(rabbit_process_name, {channel, {ConnName, Channel}}), + rabbit_misc:store_identity(channel, {ConnName, Channel}), State = #ch{state = starting, protocol = Protocol, channel = Channel, diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 9691eaa4..26f9700e 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -47,9 +47,9 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, - {tcp, Sock, Channel, FrameMax, - ReaderPid, Protocol, ConnName}), + {ok, SupPid} = supervisor2:start_link( + ?MODULE, {tcp, Sock, Channel, FrameMax, + ReaderPid, Protocol, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), [WriterPid] = supervisor2:find_child(SupPid, writer), {ok, ChannelPid} = @@ -64,7 +64,8 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, direct), + {ok, SupPid} = supervisor2:start_link( + ?MODULE, {direct, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), {ok, ChannelPid} = supervisor2:start_child( @@ -81,10 +82,11 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, init(Type) -> {ok, {{one_for_all, 0, 1}, child_specs(Type)}}. -child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, ConnName}) -> +child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) -> [{writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid, ConnName, true]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)]; -child_specs(direct) -> - [{limiter, {rabbit_limiter, start_link, []}, + [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} + | child_specs({direct, Identity})]; +child_specs({direct, Identity}) -> + [{limiter, {rabbit_limiter, start_link, [Identity]}, transient, ?MAX_WAIT, worker, [rabbit_limiter]}]. diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl index e51615e8..a9578748 100644 --- a/src/rabbit_connection_helper_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -20,7 +20,7 @@ -export([start_link/0]). -export([start_channel_sup_sup/1, - start_queue_collector/1]). + start_queue_collector/2]). -export([init/1]). @@ -31,7 +31,8 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). --spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_collector/2 :: (pid(), rabbit_types:identity()) -> + rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- @@ -45,10 +46,10 @@ start_channel_sup_sup(SupPid) -> {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}). -start_queue_collector(SupPid) -> +start_queue_collector(SupPid, Identity) -> supervisor2:start_child( SupPid, - {collector, {rabbit_queue_collector, start_link, []}, + {collector, {rabbit_queue_collector, start_link, [Identity]}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ca67254b..cbe446c2 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -16,8 +16,8 @@ -module(rabbit_heartbeat). --export([start/6]). --export([start_heartbeat_sender/3, start_heartbeat_receiver/3, +-export([start/6, start/7]). +-export([start_heartbeat_sender/4, start_heartbeat_receiver/4, pause_monitor/1, resume_monitor/1]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -39,12 +39,17 @@ non_neg_integer(), heartbeat_callback(), non_neg_integer(), heartbeat_callback()) -> heartbeaters()). --spec(start_heartbeat_sender/3 :: - (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> - rabbit_types:ok(pid())). --spec(start_heartbeat_receiver/3 :: - (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> - rabbit_types:ok(pid())). +-spec(start/7 :: + (pid(), rabbit_net:socket(), rabbit_types:identity(), + non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> heartbeaters()). + +-spec(start_heartbeat_sender/4 :: + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + rabbit_types:type_identity()) -> rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/4 :: + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + rabbit_types:type_identity()) -> rabbit_types:ok(pid())). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -56,31 +61,35 @@ -endif. %%---------------------------------------------------------------------------- - start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + start(SupPid, Sock, unknown, + SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun). + +start(SupPid, Sock, Identity, + SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock, SendFun, heartbeat_sender, - start_heartbeat_sender), + start_heartbeat_sender, Identity), {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, ReceiveFun, heartbeat_receiver, - start_heartbeat_receiver), + start_heartbeat_receiver, Identity), {Sender, Receiver}. -start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> SendFun(), continue end}). + fun () -> SendFun(), continue end}, Identity). -start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> ReceiveFun(), stop end}). + fun () -> ReceiveFun(), stop end}, Identity). pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. @@ -98,17 +107,23 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. %%---------------------------------------------------------------------------- -start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback, + _Identity) -> {ok, none}; -start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback, + Identity) -> supervisor2:start_child( SupPid, {Name, - {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]}, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). -heartbeater(Params) -> +heartbeater(Params, Identity) -> Deb = sys:debug_options([]), - {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}. + {ok, proc_lib:spawn_link(fun () -> + rabbit_misc:store_identity(Identity), + heartbeater(Params, Deb, {0, 0}) + end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Deb, {StatVal, SameCount} = State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 22da465b..bce3f531 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -119,7 +119,7 @@ -behaviour(gen_server2). --export([start_link/0]). +-export([start_link/1]). %% channel API -export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, is_prefetch_limited/1, is_blocked/1, is_active/1, @@ -145,7 +145,8 @@ -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (rabbit_types:identity()) -> + rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). -spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) @@ -193,7 +194,8 @@ %% API %%---------------------------------------------------------------------------- -start_link() -> gen_server2:start_link(?MODULE, [], []). +start_link(Identity) -> + gen_server2:start_link(?MODULE, [Identity], []). new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. @@ -320,7 +322,9 @@ update_credit(CTag, Credit, Drain, Credits) -> %% gen_server callbacks %%---------------------------------------------------------------------------- -init([]) -> {ok, #lim{}}. +init([Identity]) -> + rabbit_misc:store_identity(limiter, Identity), + {ok, #lim{}}. prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9; prioritise_call(_Msg, _From, _Len, _State) -> 0. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 211b99c0..b1d334a9 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -79,7 +79,7 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). init(Q) -> - put(rabbit_process_name, {queue_slave, Q#amqqueue.name}), + rabbit_misc:store_identity(queue_slave, Q#amqqueue.name), {ok, {not_started, Q}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -617,7 +617,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, KS1 = lists:foldl(fun (ChPid0, KS0) -> pmon:demonitor(ChPid0, KS0) end, KS, AwaitGmDown), - put(rabbit_process_name, {queue, QName}), + rabbit_misc:store_identity(queue, QName), rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 00c4eaf3..8244ea3f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -70,6 +70,7 @@ -export([interval_operation/4]). -export([ensure_timer/4, stop_timer/2]). -export([get_parent/0]). +-export([store_identity/1, store_identity/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -248,6 +249,8 @@ -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). -spec(get_parent/0 :: () -> pid()). +-spec(store_identity/2 :: (atom(), rabbit_types:identity()) -> ok). +-spec(store_identity/1 :: (rabbit_types:type_identity()) -> ok). -endif. %%---------------------------------------------------------------------------- @@ -1082,6 +1085,9 @@ stop_timer(State, Idx) -> end end. +store_identity(Type, Identity) -> store_identity({Type, Identity}). +store_identity(TypeIdentity) -> put(rabbit_process_name, TypeIdentity). + %% ------------------------------------------------------------------------- %% Begin copypasta from gen_server2.erl diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 6406f7e9..629c2bb2 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --export([start_link/0, register/2, delete_all/1]). +-export([start_link/1, register/2, delete_all/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -31,7 +31,8 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (rabbit_types:identity()) -> + rabbit_types:ok_pid_or_error()). -spec(register/2 :: (pid(), pid()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). @@ -39,8 +40,8 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link(?MODULE, [], []). +start_link(Identity) -> + gen_server:start_link(?MODULE, [Identity], []). register(CollectorPid, Q) -> gen_server:call(CollectorPid, {register, Q}, infinity). @@ -50,7 +51,8 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- -init([]) -> +init([Identity]) -> + rabbit_misc:store_identity(queue_collector, Identity), {ok, #state{monitors = pmon:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 162409d0..4f058034 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -212,7 +212,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), - put(rabbit_process_name, {reader, list_to_binary(Name)}), + rabbit_misc:store_identity(reader, list_to_binary(Name)), State = #v1{parent = Parent, sock = ClientSock, connection = #connection{ @@ -855,14 +855,16 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, [FrameMax, ServerFrameMax]); true -> {ok, Collector} = - rabbit_connection_helper_sup:start_queue_collector(SupPid), + rabbit_connection_helper_sup:start_queue_collector( + SupPid, Connection#connection.name), Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), ReceiveFun = fun() -> Parent ! heartbeat_timeout end, Heartbeater = - rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, - SendFun, ClientHeartbeat, ReceiveFun), + rabbit_heartbeat:start( + SupPid, Sock, Connection#connection.name, + ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5fe319d3..054db8ae 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1262,7 +1262,7 @@ test_writer(Pid) -> test_channel() -> Me = self(), Writer = spawn(fun () -> test_writer(Me) end), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, user(<<"guest">>), <<"/">>, [], Me, Limiter), @@ -2815,7 +2815,7 @@ test_queue_recover() -> end, rabbit_amqqueue:stop(), rabbit_amqqueue:start(rabbit_amqqueue:recover()), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> @@ -2842,7 +2842,7 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:set_ram_duration_target(QPid, 0), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index a36613db..37d4acf3 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -30,7 +30,8 @@ connection/0, protocol/0, user/0, internal_user/0, username/0, password/0, password_hash/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, - channel_exit/0, connection_exit/0, mfargs/0]). + channel_exit/0, connection_exit/0, mfargs/0, identity/0, + type_identity/0]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -156,4 +157,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). +-type(identity() :: term()). +-type(type_identity() :: {atom(), identity()}). + -endif. % use_specs diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 412be52a..433bac24 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -51,11 +51,13 @@ -> rabbit_types:ok(pid())). -spec(start/7 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), boolean(), string()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:identity(), boolean()) -> rabbit_types:ok(pid())). -spec(start_link/7 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), boolean(), string()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:identity(), boolean()) -> rabbit_types:ok(pid())). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). @@ -105,17 +107,17 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, unknown, false). -start(Sock, Channel, FrameMax, Protocol, ReaderPid, ConnName, +start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - {ok, proc_lib:spawn(?MODULE, enter_mainloop, [ConnName, State])}. + {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}. -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ConnName, +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [ConnName, State])}. + {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}. initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> (case ReaderWantsStats of @@ -138,12 +140,9 @@ system_terminate(Reason, _Parent, _Deb, _State) -> system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. -enter_mainloop(ConnName, State = #wstate{channel = Channel}) -> +enter_mainloop(Identity, State) -> Deb = sys:debug_options([]), - put(rabbit_process_name, case ConnName of - unknown -> writer; - _ -> {writer, {ConnName, Channel}} - end), + rabbit_misc:store_identity(writer, Identity), mainloop(Deb, State). mainloop(Deb, State) -> |