summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-16 14:02:41 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-16 14:02:41 +0100
commitb7aba78211abfd80c7fda5d4967219188978dc08 (patch)
treee3cb57aff09597ffce6f01437513592648e83cb7
parente352e4c34c92ff3be1cb6beb3692b9a31e194ede (diff)
downloadrabbitmq-server-b7aba78211abfd80c7fda5d4967219188978dc08.tar.gz
refactor: don't track slave's master_pid separately
-rw-r--r--src/rabbit_mirror_queue_slave.erl34
1 files changed, 16 insertions, 18 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6caf135b..7b6e4dd1 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -64,7 +64,6 @@
-record(state, { q,
gm,
- master_pid,
backing_queue,
backing_queue_state,
sync_timer_ref,
@@ -87,7 +86,7 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-init(#amqqueue { name = QueueName } = Q) ->
+init(Q = #amqqueue { name = QName }) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -100,23 +99,23 @@ init(#amqqueue { name = QueueName } = Q) ->
%% above.
%%
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ {ok, GM} = gm:start_link(QName, ?MODULE, [self()]),
receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
- fun() -> init_it(Self, Node, QueueName) end) of
- {new, MPid} ->
- erlang:monitor(process, MPid),
+ fun() -> init_it(Self, Node, QName) end) of
+ {new, QPid} ->
+ erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
+ Q1 = Q #amqqueue { pid = QPid },
+ BQS = bq_init(BQ, Q1, false),
+ State = #state { q = Q1,
gm = GM,
- master_pid = MPid,
backing_queue = BQ,
backing_queue_state = BQS,
rate_timer_ref = undefined,
@@ -144,9 +143,9 @@ init(#amqqueue { name = QueueName } = Q) ->
ignore
end.
-init_it(Self, Node, QueueName) ->
+init_it(Self, Node, QName) ->
[Q = #amqqueue { pid = QPid, slave_pids = SPids }] =
- mnesia:read({rabbit_queue, QueueName}),
+ mnesia:read({rabbit_queue, QName}),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> add_slave(Q, Self),
{new, QPid};
@@ -174,16 +173,15 @@ handle_call({deliver, Delivery, true}, From, State) ->
noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
- State = #state { q = #amqqueue { name = QueueName },
- master_pid = MPid }) ->
+ State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
%% The GM has told us about deaths, which means we're not going to
%% receive any more messages from GM
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Deaths) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
{ok, Pid, DeadPids} ->
- rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
+ rabbit_mirror_queue_misc:report_deaths(self(), false, QName,
DeadPids),
if node(Pid) =:= node(MPid) ->
%% master hasn't changed
@@ -197,7 +195,7 @@ handle_call({gm_deaths, Deaths}, From,
%% master has changed to not us.
gen_server2:reply(From, ok),
erlang:monitor(process, Pid),
- noreply(State #state { master_pid = Pid })
+ noreply(State #state { q = Q #amqqueue { pid = Pid } })
end
end;
@@ -247,7 +245,7 @@ handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
- State = #state { gm = GM, master_pid = MPid }) ->
+ State = #state { gm = GM, q = #amqqueue { pid = MPid } }) ->
ok = gm:broadcast(GM, process_death),
noreply(State);
@@ -370,7 +368,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
-i(master_pid, #state { master_pid = MPid }) -> MPid;
+i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid;
i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).