summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-31 17:35:02 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-31 17:35:02 +0000
commitf7190c8cb07ae8e639244be1f94f55bc5278aab3 (patch)
tree63279b5916042450b26b819f63280532e847eb04
parent91d719f2c84363efd989bc3d81c1f59d062a7898 (diff)
downloadrabbitmq-server-f7190c8cb07ae8e639244be1f94f55bc5278aab3.tar.gz
Two-stage-startup-with-mnesia-record, like rabbit_amqqueue_process.
-rw-r--r--src/rabbit_mirror_queue_slave.erl104
1 files changed, 57 insertions, 47 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 2cdc7637..0c25cf3a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -90,7 +90,9 @@
}).
start_link(Q) ->
- gen_server2:start_link(?MODULE, [Q], []).
+ {ok, Pid} = gen_server2:start_link(?MODULE, [], []),
+ gen_server2:call(Pid, {init, Q}, infinity),
+ {ok, Pid}.
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
@@ -98,55 +100,63 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) ->
gen_server2:call(QPid, info, infinity).
-init([#amqqueue { name = QueueName } = Q]) ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
+init([]) ->
+ {ok, not_started, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({init, #amqqueue { name = QueueName } = Q}, _From, not_started) ->
Self = self(),
Node = node(),
- {ok, MPid} =
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- %% ASSERTION
- [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
- MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {ok, QPid}
- end),
- erlang:monitor(process, MPid),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [Self]),
- ok = rabbit_memory_monitor:register(
- Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
- {ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
- gm = GM,
- master_pid = MPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = undefined,
- sync_timer_ref = undefined,
-
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
- ack_num = 0,
-
- msg_id_status = dict:new(),
- known_senders = dict:new(),
-
- synchronised = false
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] -> MPids1 = MPids ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
+ Q1 #amqqueue { slave_pids = MPids1 }),
+ {new, QPid};
+ [SPid] -> true = rabbit_misc:is_process_alive(SPid),
+ existing
+ end
+ end) of
+ {new, MPid} ->
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} ->
+ ok
+ end,
+ erlang:monitor(process, MPid),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [Self]),
+ ok = rabbit_memory_monitor:register(
+ Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = bq_init(BQ, Q, false),
+ State = #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+
+ msg_id_status = dict:new(),
+ known_senders = dict:new(),
+
+ synchronised = false
},
- rabbit_event:notify(queue_slave_created,
- infos(?CREATION_EVENT_KEYS, State)),
- ok = gm:broadcast(GM, request_length),
- {ok, State, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ rabbit_event:notify(queue_slave_created,
+ infos(?CREATION_EVENT_KEYS, State)),
+ ok = gm:broadcast(GM, request_length),
+ reply(ok, State);
+ existing ->
+ {stop, normal, existing, #state{}}
+ end;
handle_call({deliver, Delivery = #delivery { immediate = true }},
From, State) ->