summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-12-10 11:34:29 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-12-10 11:34:29 +0000
commitb5224659d3efcee5ba57b1dc4897a91595db7e4a (patch)
treef11706d9844a30f6c0f77ed34c6638478574dd2c
parent2f797f1991a6e98cad4a0a85a9580d71542b4858 (diff)
downloadrabbitmq-server-b5224659d3efcee5ba57b1dc4897a91595db7e4a.tar.gz
A bit more abstraction, plus identities for heartbeater /
queue_collector / limiter.
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_channel_sup.erl20
-rw-r--r--src/rabbit_connection_helper_sup.erl9
-rw-r--r--src/rabbit_heartbeat.erl55
-rw-r--r--src/rabbit_limiter.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_queue_collector.erl12
-rw-r--r--src/rabbit_reader.erl10
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_writer.erl21
13 files changed, 100 insertions, 65 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 049df840..2c2b747f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -124,7 +124,7 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
- put(rabbit_process_name, {queue, Q#amqqueue.name}),
+ rabbit_misc:store_identity(queue, Q#amqqueue.name),
{ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dbf66dcf..00ff1f43 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -195,7 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
- put(rabbit_process_name, {channel, {ConnName, Channel}}),
+ rabbit_misc:store_identity(channel, {ConnName, Channel}),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 9691eaa4..26f9700e 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -47,9 +47,9 @@
start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE,
- {tcp, Sock, Channel, FrameMax,
- ReaderPid, Protocol, ConnName}),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {tcp, Sock, Channel, FrameMax,
+ ReaderPid, Protocol, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
[WriterPid] = supervisor2:find_child(SupPid, writer),
{ok, ChannelPid} =
@@ -64,7 +64,8 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE, direct),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {direct, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
{ok, ChannelPid} =
supervisor2:start_child(
@@ -81,10 +82,11 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
init(Type) ->
{ok, {{one_for_all, 0, 1}, child_specs(Type)}}.
-child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, ConnName}) ->
+child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) ->
[{writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol, ReaderPid, ConnName, true]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)];
-child_specs(direct) ->
- [{limiter, {rabbit_limiter, start_link, []},
+ [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}
+ | child_specs({direct, Identity})];
+child_specs({direct, Identity}) ->
+ [{limiter, {rabbit_limiter, start_link, [Identity]},
transient, ?MAX_WAIT, worker, [rabbit_limiter]}].
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl
index e51615e8..a9578748 100644
--- a/src/rabbit_connection_helper_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -20,7 +20,7 @@
-export([start_link/0]).
-export([start_channel_sup_sup/1,
- start_queue_collector/1]).
+ start_queue_collector/2]).
-export([init/1]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
--spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_collector/2 :: (pid(), rabbit_types:identity()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
@@ -45,10 +46,10 @@ start_channel_sup_sup(SupPid) ->
{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
-start_queue_collector(SupPid) ->
+start_queue_collector(SupPid, Identity) ->
supervisor2:start_child(
SupPid,
- {collector, {rabbit_queue_collector, start_link, []},
+ {collector, {rabbit_queue_collector, start_link, [Identity]},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ca67254b..cbe446c2 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -16,8 +16,8 @@
-module(rabbit_heartbeat).
--export([start/6]).
--export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+-export([start/6, start/7]).
+-export([start_heartbeat_sender/4, start_heartbeat_receiver/4,
pause_monitor/1, resume_monitor/1]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -39,12 +39,17 @@
non_neg_integer(), heartbeat_callback(),
non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
--spec(start_heartbeat_sender/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
--spec(start_heartbeat_receiver/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
+-spec(start/7 ::
+ (pid(), rabbit_net:socket(), rabbit_types:identity(),
+ non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
+
+-spec(start_heartbeat_sender/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:type_identity()) -> rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:type_identity()) -> rabbit_types:ok(pid())).
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -56,31 +61,35 @@
-endif.
%%----------------------------------------------------------------------------
-
start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ start(SupPid, Sock, unknown,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun).
+
+start(SupPid, Sock, Identity,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
{ok, Sender} =
start_heartbeater(SendTimeoutSec, SupPid, Sock,
SendFun, heartbeat_sender,
- start_heartbeat_sender),
+ start_heartbeat_sender, Identity),
{ok, Receiver} =
start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
ReceiveFun, heartbeat_receiver,
- start_heartbeat_receiver),
+ start_heartbeat_receiver, Identity),
{Sender, Receiver}.
-start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
+start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () -> SendFun(), continue end}).
+ fun () -> SendFun(), continue end}, Identity).
-start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
+start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
- fun () -> ReceiveFun(), stop end}).
+ fun () -> ReceiveFun(), stop end}, Identity).
pause_monitor({_Sender, none}) -> ok;
pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
@@ -98,17 +107,23 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
%%----------------------------------------------------------------------------
-start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback,
+ _Identity) ->
{ok, none};
-start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback,
+ Identity) ->
supervisor2:start_child(
SupPid, {Name,
- {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]},
+ {rabbit_heartbeat, Callback,
+ [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
-heartbeater(Params) ->
+heartbeater(Params, Identity) ->
Deb = sys:debug_options([]),
- {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}.
+ {ok, proc_lib:spawn_link(fun () ->
+ rabbit_misc:store_identity(Identity),
+ heartbeater(Params, Deb, {0, 0})
+ end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
Deb, {StatVal, SameCount} = State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 22da465b..bce3f531 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -119,7 +119,7 @@
-behaviour(gen_server2).
--export([start_link/0]).
+-export([start_link/1]).
%% channel API
-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
is_prefetch_limited/1, is_blocked/1, is_active/1,
@@ -145,7 +145,8 @@
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:identity()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
@@ -193,7 +194,8 @@
%% API
%%----------------------------------------------------------------------------
-start_link() -> gen_server2:start_link(?MODULE, [], []).
+start_link(Identity) ->
+ gen_server2:start_link(?MODULE, [Identity], []).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
@@ -320,7 +322,9 @@ update_credit(CTag, Credit, Drain, Credits) ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) -> {ok, #lim{}}.
+init([Identity]) ->
+ rabbit_misc:store_identity(limiter, Identity),
+ {ok, #lim{}}.
prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9;
prioritise_call(_Msg, _From, _Len, _State) -> 0.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 211b99c0..b1d334a9 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -79,7 +79,7 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
init(Q) ->
- put(rabbit_process_name, {queue_slave, Q#amqqueue.name}),
+ rabbit_misc:store_identity(queue_slave, Q#amqqueue.name),
{ok, {not_started, Q}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}}.
@@ -617,7 +617,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
KS1 = lists:foldl(fun (ChPid0, KS0) ->
pmon:demonitor(ChPid0, KS0)
end, KS, AwaitGmDown),
- put(rabbit_process_name, {queue, QName}),
+ rabbit_misc:store_identity(queue, QName),
rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 00c4eaf3..8244ea3f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -70,6 +70,7 @@
-export([interval_operation/4]).
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
+-export([store_identity/1, store_identity/2]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -248,6 +249,8 @@
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
-spec(get_parent/0 :: () -> pid()).
+-spec(store_identity/2 :: (atom(), rabbit_types:identity()) -> ok).
+-spec(store_identity/1 :: (rabbit_types:type_identity()) -> ok).
-endif.
%%----------------------------------------------------------------------------
@@ -1082,6 +1085,9 @@ stop_timer(State, Idx) ->
end
end.
+store_identity(Type, Identity) -> store_identity({Type, Identity}).
+store_identity(TypeIdentity) -> put(rabbit_process_name, TypeIdentity).
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 6406f7e9..629c2bb2 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server).
--export([start_link/0, register/2, delete_all/1]).
+-export([start_link/1, register/2, delete_all/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:identity()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (pid(), pid()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
@@ -39,8 +40,8 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
+start_link(Identity) ->
+ gen_server:start_link(?MODULE, [Identity], []).
register(CollectorPid, Q) ->
gen_server:call(CollectorPid, {register, Q}, infinity).
@@ -50,7 +51,8 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
-init([]) ->
+init([Identity]) ->
+ rabbit_misc:store_identity(queue_collector, Identity),
{ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 162409d0..4f058034 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -212,7 +212,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
- put(rabbit_process_name, {reader, list_to_binary(Name)}),
+ rabbit_misc:store_identity(reader, list_to_binary(Name)),
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -855,14 +855,16 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
[FrameMax, ServerFrameMax]);
true ->
{ok, Collector} =
- rabbit_connection_helper_sup:start_queue_collector(SupPid),
+ rabbit_connection_helper_sup:start_queue_collector(
+ SupPid, Connection#connection.name),
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
Heartbeater =
- rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
- SendFun, ClientHeartbeat, ReceiveFun),
+ rabbit_heartbeat:start(
+ SupPid, Sock, Connection#connection.name,
+ ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5fe319d3..054db8ae 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1262,7 +1262,7 @@ test_writer(Pid) ->
test_channel() ->
Me = self(),
Writer = spawn(fun () -> test_writer(Me) end),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1,
user(<<"guest">>), <<"/">>, [], Me, Limiter),
@@ -2815,7 +2815,7 @@ test_queue_recover() ->
end,
rabbit_amqqueue:stop(),
rabbit_amqqueue:start(rabbit_amqqueue:recover()),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
@@ -2842,7 +2842,7 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index a36613db..37d4acf3 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -30,7 +30,8 @@
connection/0, protocol/0, user/0, internal_user/0,
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
- channel_exit/0, connection_exit/0, mfargs/0]).
+ channel_exit/0, connection_exit/0, mfargs/0, identity/0,
+ type_identity/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +157,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
+-type(identity() :: term()).
+-type(type_identity() :: {atom(), identity()}).
+
-endif. % use_specs
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 412be52a..433bac24 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -51,11 +51,13 @@
-> rabbit_types:ok(pid())).
-spec(start/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean(), string())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:identity(), boolean())
-> rabbit_types:ok(pid())).
-spec(start_link/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean(), string())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:identity(), boolean())
-> rabbit_types:ok(pid())).
-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
@@ -105,17 +107,17 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, unknown, false).
-start(Sock, Channel, FrameMax, Protocol, ReaderPid, ConnName,
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- {ok, proc_lib:spawn(?MODULE, enter_mainloop, [ConnName, State])}.
+ {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
-start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ConnName,
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [ConnName, State])}.
+ {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}.
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
(case ReaderWantsStats of
@@ -138,12 +140,9 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
-enter_mainloop(ConnName, State = #wstate{channel = Channel}) ->
+enter_mainloop(Identity, State) ->
Deb = sys:debug_options([]),
- put(rabbit_process_name, case ConnName of
- unknown -> writer;
- _ -> {writer, {ConnName, Channel}}
- end),
+ rabbit_misc:store_identity(writer, Identity),
mainloop(Deb, State).
mainloop(Deb, State) ->