summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-12-10 12:42:03 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-12-10 12:42:03 +0000
commitd7b708c3a096bd3f355dee03af4d064f8aec4c47 (patch)
tree71cc8f5a140b28993990b25069767e550324d461
parenta3a42f14505cb1e366a931a74ae21a29a6aee444 (diff)
downloadrabbitmq-server-d7b708c3a096bd3f355dee03af4d064f8aec4c47.tar.gz
Add queue_coordinator / queue_syncer / gm. Shift to getting gen_server2 to store the name in most cases.
-rw-r--r--src/gen_server2.erl6
-rw-r--r--src/gm.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_limiter.erl7
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl6
-rw-r--r--src/rabbit_mirror_queue_sync.erl12
-rw-r--r--src/rabbit_misc.erl4
-rw-r--r--src/rabbit_queue_collector.erl15
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_writer.erl2
13 files changed, 39 insertions, 31 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 6690d181..37587dc3 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -283,7 +283,7 @@ behaviour_info(_Other) ->
%%% Name ::= {local, atom()} | {global, atom()}
%%% Mod ::= atom(), callback module implementing the 'real' server
%%% Args ::= term(), init arguments (to Mod:init/1)
-%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
+%%% Options ::= [{timeout, Timeout} | {debug, [Flag]} | {proc_name, term()}]
%%% Flag ::= trace | log | {logfile, File} | statistics | debug
%%% (debug == log && statistics)
%%% Returns: {ok, Pid} |
@@ -463,6 +463,10 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
mod = Mod,
queue = Queue,
debug = Debug }),
+ case opt(proc_name, Options) of
+ {ok, ProcName} -> put(process_name, {Mod, ProcName});
+ false -> ok
+ end,
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
diff --git a/src/gm.erl b/src/gm.erl
index 098d84fa..2b4a3841 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -517,7 +517,8 @@ table_definitions() ->
[{Name, [?TABLE_MATCH | Attributes]}].
start_link(GroupName, Module, Args, TxnFun) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
+ gen_server2:start_link(
+ ?MODULE, [GroupName, Module, Args, TxnFun], [{proc_name, GroupName}]).
leave(Server) ->
gen_server2:cast(Server, leave).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index dd3d09d5..39863807 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -116,7 +116,8 @@
%%----------------------------------------------------------------------------
-start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) ->
+ gen_server2:start_link(?MODULE, Q, [{proc_name, Q#amqqueue.name}]).
info_keys() -> ?INFO_KEYS.
@@ -124,7 +125,6 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
- rabbit_misc:store_proc_name(queue, Q#amqqueue.name),
{ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ace9d112..5ea7fe4d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -119,7 +119,8 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, CollectorPid, Limiter) ->
gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, CollectorPid, Limiter], []).
+ User, VHost, Capabilities, CollectorPid, Limiter],
+ [{proc_name, {ConnName, Channel}}]).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -195,7 +196,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
- rabbit_misc:store_proc_name(channel, {ConnName, Channel}),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 359e9261..6aca34ae 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -194,8 +194,8 @@
%% API
%%----------------------------------------------------------------------------
-start_link(Identity) ->
- gen_server2:start_link(?MODULE, [Identity], []).
+start_link(ProcName) ->
+ gen_server2:start_link(?MODULE, [], [{proc_name, ProcName}]).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
@@ -322,8 +322,7 @@ update_credit(CTag, Credit, Drain, Credits) ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Identity]) ->
- rabbit_misc:store_proc_name(limiter, Identity),
+init([]) ->
{ok, #lim{}}.
prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9;
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index a0e8bcc6..0a034b6d 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -310,7 +310,8 @@
%%----------------------------------------------------------------------------
start_link(Queue, GM, DeathFun, DepthFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []).
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun],
+ [{proc_name, Queue#amqqueue.name}]).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index d9cef642..4f50e1a5 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -145,7 +145,7 @@ sync_mirrors(HandleInfo, EmitStats,
Log("~p messages to synchronise", [BQ:len(BQS)]),
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
Ref = make_ref(),
- Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9a7509c4..3b91b39d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -71,7 +71,8 @@
%%----------------------------------------------------------------------------
-start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(
+ ?MODULE, Q, [{proc_name, Q#amqqueue.name}]).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
@@ -79,7 +80,6 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
init(Q) ->
- rabbit_misc:store_proc_name(queue_slave, Q#amqqueue.name),
{ok, {not_started, Q}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}}.
@@ -617,7 +617,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
KS1 = lists:foldl(fun (ChPid0, KS0) ->
pmon:demonitor(ChPid0, KS0)
end, KS, AwaitGmDown),
- rabbit_misc:store_proc_name(queue, QName),
+ rabbit_misc:store_proc_name(rabbit_amqqueue_process, QName),
rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 61e90105..b4c409b2 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/3, master_go/7, slave/7]).
+-export([master_prepare/4, master_go/7, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -61,7 +61,8 @@
-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
bqs()}).
--spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
+-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(),
+ log_fun(), [pid()]) -> pid()).
-spec(master_go/7 :: (pid(), reference(), log_fun(),
rabbit_mirror_queue_master:stats_fun(),
rabbit_mirror_queue_master:stats_fun(),
@@ -80,9 +81,12 @@
%% ---------------------------------------------------------------------------
%% Master
-master_prepare(Ref, Log, SPids) ->
+master_prepare(Ref, QName, Log, SPids) ->
MPid = self(),
- spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
+ spawn_link(fun () ->
+ rabbit_misc:store_proc_name(?MODULE, QName),
+ syncer(Ref, Log, MPid, SPids)
+ end).
master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 5caa340e..80e160d9 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -1085,8 +1085,8 @@ stop_timer(State, Idx) ->
end
end.
-store_proc_name(Type, Identity) -> store_proc_name({Type, Identity}).
-store_proc_name(TypeIdentity) -> put(process_name, TypeIdentity).
+store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
+store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 73ef0eb2..1620f89a 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -16,7 +16,7 @@
-module(rabbit_queue_collector).
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([start_link/1, register/2, delete_all/1]).
@@ -40,19 +40,18 @@
%%----------------------------------------------------------------------------
-start_link(Identity) ->
- gen_server:start_link(?MODULE, [Identity], []).
+start_link(ProcName) ->
+ gen_server2:start_link(?MODULE, [], [{proc_name, ProcName}]).
register(CollectorPid, Q) ->
- gen_server:call(CollectorPid, {register, Q}, infinity).
+ gen_server2:call(CollectorPid, {register, Q}, infinity).
delete_all(CollectorPid) ->
- gen_server:call(CollectorPid, delete_all, infinity).
+ gen_server2:call(CollectorPid, delete_all, infinity).
%%----------------------------------------------------------------------------
-init([Identity]) ->
- rabbit_misc:store_proc_name(queue_collector, Identity),
+init([]) ->
{ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
@@ -80,7 +79,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason},
State = #state{monitors = QMons, delete_from = Deleting}) ->
QMons1 = pmon:erase(DownPid, QMons),
case Deleting =/= undefined andalso pmon:is_empty(QMons1) of
- true -> gen_server:reply(Deleting, ok);
+ true -> gen_server2:reply(Deleting, ok);
false -> ok
end,
{noreply, State#state{monitors = QMons1}}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e510f3f1..b3b341c5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -213,7 +213,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
- rabbit_misc:store_proc_name(reader, list_to_binary(Name)),
+ rabbit_misc:store_proc_name(?MODULE, list_to_binary(Name)),
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index b1d4c5bd..3bb0c3fd 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -142,7 +142,7 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
enter_mainloop(Identity, State) ->
Deb = sys:debug_options([]),
- rabbit_misc:store_proc_name(writer, Identity),
+ rabbit_misc:store_proc_name(?MODULE, Identity),
mainloop(Deb, State).
mainloop(Deb, State) ->