diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-02-01 17:23:57 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-02-01 17:23:57 +0000 |
commit | eca129d8b936472c5c3dd280feca40fcb8d2e3bd (patch) | |
tree | 92e68de24b37ed1d291f5e676b6370fcb018d28b | |
parent | c6a6066ab88296f6c37f2a8eae549e907f01106b (diff) | |
parent | 4df7acf0962a40ad7c0d848fc500fe6f51f78e6c (diff) | |
download | rabbitmq-server-eca129d8b936472c5c3dd280feca40fcb8d2e3bd.tar.gz |
Merge heads
-rw-r--r-- | docs/rabbitmqctl.1.xml | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 14 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 100 |
3 files changed, 65 insertions, 56 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 4100864e..c1c51f9f 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -266,10 +266,9 @@ </para> <para> When the target files do not exist they are created. - target files do not already exist. When - no <option>suffix</option> is specified, the empty log - files are simply created at the original location; no - rotation takes place. + When no <option>suffix</option> is specified, the empty + log files are simply created at the original location; + no rotation takes place. </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl rotate_logs .1</screen> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index d1caf5aa..226fbea0 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -136,12 +136,16 @@ add_mirror(Queue, MirrorNode) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> Result = rabbit_mirror_queue_slave_sup:start_child( MirrorNode, [Q]), - rabbit_log:info( - "Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, Result]), case Result of - {ok, _Pid} -> ok; - _ -> Result + {ok, undefined} -> %% Already running + ok; + {ok, _Pid} -> + rabbit_log:info( + "Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, Result]), + ok; + _ -> + Result end; [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} end diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 2cdc7637..1fd927dd 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -90,7 +90,7 @@ }). start_link(Q) -> - gen_server2:start_link(?MODULE, [Q], []). + gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). @@ -98,55 +98,61 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init([#amqqueue { name = QueueName } = Q]) -> - process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), - receive {joined, GM} -> - ok - end, +init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), - {ok, MPid} = - rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - %% ASSERTION - [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], - MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {ok, QPid} - end), - erlang:monitor(process, MPid), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [Self]), - ok = rabbit_memory_monitor:register( - Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), - {ok, BQ} = application:get_env(backing_queue_module), - BQS = bq_init(BQ, Q, false), - State = #state { q = Q, - gm = GM, - master_pid = MPid, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - ack_num = 0, - - msg_id_status = dict:new(), - known_senders = dict:new(), - - synchronised = false + case rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1 #amqqueue { slave_pids = MPids1 }), + {new, QPid}; + [SPid] -> true = rabbit_misc:is_process_alive(SPid), + existing + end + end) of + {new, MPid} -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + erlang:monitor(process, MPid), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [Self]), + ok = rabbit_memory_monitor:register( + Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = bq_init(BQ, Q, false), + State = #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new(), + + synchronised = false }, - rabbit_event:notify(queue_slave_created, - infos(?CREATION_EVENT_KEYS, State)), - ok = gm:broadcast(GM, request_length), - {ok, State, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + rabbit_event:notify(queue_slave_created, + infos(?CREATION_EVENT_KEYS, State)), + ok = gm:broadcast(GM, request_length), + {ok, State, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}; + existing -> + ignore + end. handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> |