diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-12-10 12:42:03 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-12-10 12:42:03 +0000 |
commit | d7b708c3a096bd3f355dee03af4d064f8aec4c47 (patch) | |
tree | 71cc8f5a140b28993990b25069767e550324d461 | |
parent | a3a42f14505cb1e366a931a74ae21a29a6aee444 (diff) | |
download | rabbitmq-server-d7b708c3a096bd3f355dee03af4d064f8aec4c47.tar.gz |
Add queue_coordinator / queue_syncer / gm. Shift to getting gen_server2 to store the name in most cases.
-rw-r--r-- | src/gen_server2.erl | 6 | ||||
-rw-r--r-- | src/gm.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 12 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 4 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 15 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 2 |
13 files changed, 39 insertions, 31 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 6690d181..37587dc3 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -283,7 +283,7 @@ behaviour_info(_Other) -> %%% Name ::= {local, atom()} | {global, atom()} %%% Mod ::= atom(), callback module implementing the 'real' server %%% Args ::= term(), init arguments (to Mod:init/1) -%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] +%%% Options ::= [{timeout, Timeout} | {debug, [Flag]} | {proc_name, term()}] %%% Flag ::= trace | log | {logfile, File} | statistics | debug %%% (debug == log && statistics) %%% Returns: {ok, Pid} | @@ -463,6 +463,10 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> mod = Mod, queue = Queue, debug = Debug }), + case opt(proc_name, Options) of + {ok, ProcName} -> put(process_name, {Mod, ProcName}); + false -> ok + end, case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), @@ -517,7 +517,8 @@ table_definitions() -> [{Name, [?TABLE_MATCH | Attributes]}]. start_link(GroupName, Module, Args, TxnFun) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). + gen_server2:start_link( + ?MODULE, [GroupName, Module, Args, TxnFun], [{proc_name, GroupName}]). leave(Server) -> gen_server2:cast(Server, leave). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index dd3d09d5..39863807 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -116,7 +116,8 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> + gen_server2:start_link(?MODULE, Q, [{proc_name, Q#amqqueue.name}]). info_keys() -> ?INFO_KEYS. @@ -124,7 +125,6 @@ info_keys() -> ?INFO_KEYS. init(Q) -> process_flag(trap_exit, true), - rabbit_misc:store_proc_name(queue, Q#amqqueue.name), {ok, init_state(Q#amqqueue{pid = self()}), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ace9d112..5ea7fe4d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -119,7 +119,8 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, Limiter) -> gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, CollectorPid, Limiter], []). + User, VHost, Capabilities, CollectorPid, Limiter], + [{proc_name, {ConnName, Channel}}]). do(Pid, Method) -> do(Pid, Method, none). @@ -195,7 +196,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), - rabbit_misc:store_proc_name(channel, {ConnName, Channel}), State = #ch{state = starting, protocol = Protocol, channel = Channel, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 359e9261..6aca34ae 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -194,8 +194,8 @@ %% API %%---------------------------------------------------------------------------- -start_link(Identity) -> - gen_server2:start_link(?MODULE, [Identity], []). +start_link(ProcName) -> + gen_server2:start_link(?MODULE, [], [{proc_name, ProcName}]). new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. @@ -322,8 +322,7 @@ update_credit(CTag, Credit, Drain, Credits) -> %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Identity]) -> - rabbit_misc:store_proc_name(limiter, Identity), +init([]) -> {ok, #lim{}}. prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9; diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index a0e8bcc6..0a034b6d 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -310,7 +310,8 @@ %%---------------------------------------------------------------------------- start_link(Queue, GM, DeathFun, DepthFun) -> - gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []). + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], + [{proc_name, Queue#amqqueue.name}]). get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index d9cef642..4f50e1a5 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -145,7 +145,7 @@ sync_mirrors(HandleInfo, EmitStats, Log("~p messages to synchronise", [BQ:len(BQS)]), {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), Ref = make_ref(), - Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), + Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go( diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9a7509c4..3b91b39d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -71,7 +71,8 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> gen_server2:start_link( + ?MODULE, Q, [{proc_name, Q#amqqueue.name}]). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). @@ -79,7 +80,6 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). init(Q) -> - rabbit_misc:store_proc_name(queue_slave, Q#amqqueue.name), {ok, {not_started, Q}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -617,7 +617,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, KS1 = lists:foldl(fun (ChPid0, KS0) -> pmon:demonitor(ChPid0, KS0) end, KS, AwaitGmDown), - rabbit_misc:store_proc_name(queue, QName), + rabbit_misc:store_proc_name(rabbit_amqqueue_process, QName), rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 61e90105..b4c409b2 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/3, master_go/7, slave/7]). +-export([master_prepare/4, master_go/7, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -61,7 +61,8 @@ -type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(), bqs()}). --spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). +-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(), + log_fun(), [pid()]) -> pid()). -spec(master_go/7 :: (pid(), reference(), log_fun(), rabbit_mirror_queue_master:stats_fun(), rabbit_mirror_queue_master:stats_fun(), @@ -80,9 +81,12 @@ %% --------------------------------------------------------------------------- %% Master -master_prepare(Ref, Log, SPids) -> +master_prepare(Ref, QName, Log, SPids) -> MPid = self(), - spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). + spawn_link(fun () -> + rabbit_misc:store_proc_name(?MODULE, QName), + syncer(Ref, Log, MPid, SPids) + end). master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5caa340e..80e160d9 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -1085,8 +1085,8 @@ stop_timer(State, Idx) -> end end. -store_proc_name(Type, Identity) -> store_proc_name({Type, Identity}). -store_proc_name(TypeIdentity) -> put(process_name, TypeIdentity). +store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). +store_proc_name(TypeProcName) -> put(process_name, TypeProcName). %% ------------------------------------------------------------------------- %% Begin copypasta from gen_server2.erl diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 73ef0eb2..1620f89a 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_collector). --behaviour(gen_server). +-behaviour(gen_server2). -export([start_link/1, register/2, delete_all/1]). @@ -40,19 +40,18 @@ %%---------------------------------------------------------------------------- -start_link(Identity) -> - gen_server:start_link(?MODULE, [Identity], []). +start_link(ProcName) -> + gen_server2:start_link(?MODULE, [], [{proc_name, ProcName}]). register(CollectorPid, Q) -> - gen_server:call(CollectorPid, {register, Q}, infinity). + gen_server2:call(CollectorPid, {register, Q}, infinity). delete_all(CollectorPid) -> - gen_server:call(CollectorPid, delete_all, infinity). + gen_server2:call(CollectorPid, delete_all, infinity). %%---------------------------------------------------------------------------- -init([Identity]) -> - rabbit_misc:store_proc_name(queue_collector, Identity), +init([]) -> {ok, #state{monitors = pmon:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- @@ -80,7 +79,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{monitors = QMons, delete_from = Deleting}) -> QMons1 = pmon:erase(DownPid, QMons), case Deleting =/= undefined andalso pmon:is_empty(QMons1) of - true -> gen_server:reply(Deleting, ok); + true -> gen_server2:reply(Deleting, ok); false -> ok end, {noreply, State#state{monitors = QMons1}}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e510f3f1..b3b341c5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -213,7 +213,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), - rabbit_misc:store_proc_name(reader, list_to_binary(Name)), + rabbit_misc:store_proc_name(?MODULE, list_to_binary(Name)), State = #v1{parent = Parent, sock = ClientSock, connection = #connection{ diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index b1d4c5bd..3bb0c3fd 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -142,7 +142,7 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> enter_mainloop(Identity, State) -> Deb = sys:debug_options([]), - rabbit_misc:store_proc_name(writer, Identity), + rabbit_misc:store_proc_name(?MODULE, Identity), mainloop(Deb, State). mainloop(Deb, State) -> |