diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-16 14:02:41 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-16 14:02:41 +0100 |
commit | b7aba78211abfd80c7fda5d4967219188978dc08 (patch) | |
tree | e3cb57aff09597ffce6f01437513592648e83cb7 | |
parent | e352e4c34c92ff3be1cb6beb3692b9a31e194ede (diff) | |
download | rabbitmq-server-b7aba78211abfd80c7fda5d4967219188978dc08.tar.gz |
refactor: don't track slave's master_pid separately
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 34 |
1 files changed, 16 insertions, 18 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6caf135b..7b6e4dd1 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -64,7 +64,6 @@ -record(state, { q, gm, - master_pid, backing_queue, backing_queue_state, sync_timer_ref, @@ -87,7 +86,7 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init(#amqqueue { name = QueueName } = Q) -> +init(Q = #amqqueue { name = QName }) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -100,23 +99,23 @@ init(#amqqueue { name = QueueName } = Q) -> %% above. %% process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + {ok, GM} = gm:start_link(QName, ?MODULE, [self()]), receive {joined, GM} -> ok end, Self = self(), Node = node(), case rabbit_misc:execute_mnesia_transaction( - fun() -> init_it(Self, Node, QueueName) end) of - {new, MPid} -> - erlang:monitor(process, MPid), + fun() -> init_it(Self, Node, QName) end) of + {new, QPid} -> + erlang:monitor(process, QPid), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [Self]), ok = rabbit_memory_monitor:register( Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), - BQS = bq_init(BQ, Q, false), - State = #state { q = Q, + Q1 = Q #amqqueue { pid = QPid }, + BQS = bq_init(BQ, Q1, false), + State = #state { q = Q1, gm = GM, - master_pid = MPid, backing_queue = BQ, backing_queue_state = BQS, rate_timer_ref = undefined, @@ -144,9 +143,9 @@ init(#amqqueue { name = QueueName } = Q) -> ignore end. -init_it(Self, Node, QueueName) -> +init_it(Self, Node, QName) -> [Q = #amqqueue { pid = QPid, slave_pids = SPids }] = - mnesia:read({rabbit_queue, QueueName}), + mnesia:read({rabbit_queue, QName}), case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of [] -> add_slave(Q, Self), {new, QPid}; @@ -174,16 +173,15 @@ handle_call({deliver, Delivery, true}, From, State) -> noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, - State = #state { q = #amqqueue { name = QueueName }, - master_pid = MPid }) -> + State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) -> %% The GM has told us about deaths, which means we're not going to %% receive any more messages from GM - case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + case rabbit_mirror_queue_misc:remove_from_queue(QName, Deaths) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; {ok, Pid, DeadPids} -> - rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, + rabbit_mirror_queue_misc:report_deaths(self(), false, QName, DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed @@ -197,7 +195,7 @@ handle_call({gm_deaths, Deaths}, From, %% master has changed to not us. gen_server2:reply(From, ok), erlang:monitor(process, Pid), - noreply(State #state { master_pid = Pid }) + noreply(State #state { q = Q #amqqueue { pid = Pid } }) end end; @@ -247,7 +245,7 @@ handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, - State = #state { gm = GM, master_pid = MPid }) -> + State = #state { gm = GM, q = #amqqueue { pid = MPid } }) -> ok = gm:broadcast(GM, process_death), noreply(State); @@ -370,7 +368,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; -i(master_pid, #state { master_pid = MPid }) -> MPid; +i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid; i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0; i(Item, _State) -> throw({bad_argument, Item}). |