diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-31 17:35:02 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-31 17:35:02 +0000 |
commit | f7190c8cb07ae8e639244be1f94f55bc5278aab3 (patch) | |
tree | 63279b5916042450b26b819f63280532e847eb04 | |
parent | 91d719f2c84363efd989bc3d81c1f59d062a7898 (diff) | |
download | rabbitmq-server-f7190c8cb07ae8e639244be1f94f55bc5278aab3.tar.gz |
Two-stage-startup-with-mnesia-record, like rabbit_amqqueue_process.
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 104 |
1 files changed, 57 insertions, 47 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 2cdc7637..0c25cf3a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -90,7 +90,9 @@ }). start_link(Q) -> - gen_server2:start_link(?MODULE, [Q], []). + {ok, Pid} = gen_server2:start_link(?MODULE, [], []), + gen_server2:call(Pid, {init, Q}, infinity), + {ok, Pid}. set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). @@ -98,55 +100,63 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init([#amqqueue { name = QueueName } = Q]) -> - process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), - receive {joined, GM} -> - ok - end, +init([]) -> + {ok, not_started, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({init, #amqqueue { name = QueueName } = Q}, _From, not_started) -> Self = self(), Node = node(), - {ok, MPid} = - rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - %% ASSERTION - [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], - MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {ok, QPid} - end), - erlang:monitor(process, MPid), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [Self]), - ok = rabbit_memory_monitor:register( - Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), - {ok, BQ} = application:get_env(backing_queue_module), - BQS = bq_init(BQ, Q, false), - State = #state { q = Q, - gm = GM, - master_pid = MPid, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - ack_num = 0, - - msg_id_status = dict:new(), - known_senders = dict:new(), - - synchronised = false + case rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1 #amqqueue { slave_pids = MPids1 }), + {new, QPid}; + [SPid] -> true = rabbit_misc:is_process_alive(SPid), + existing + end + end) of + {new, MPid} -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + erlang:monitor(process, MPid), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [Self]), + ok = rabbit_memory_monitor:register( + Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = bq_init(BQ, Q, false), + State = #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new(), + + synchronised = false }, - rabbit_event:notify(queue_slave_created, - infos(?CREATION_EVENT_KEYS, State)), - ok = gm:broadcast(GM, request_length), - {ok, State, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + rabbit_event:notify(queue_slave_created, + infos(?CREATION_EVENT_KEYS, State)), + ok = gm:broadcast(GM, request_length), + reply(ok, State); + existing -> + {stop, normal, existing, #state{}} + end; handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> |