summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl22
1 files changed, 15 insertions, 7 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c4ae307c..e4d78c45 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -155,7 +155,8 @@ init_it(Self, Node, QueueName) ->
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
[] ->
MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(Q1#amqqueue{slave_pids=MPids1}),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
{new, QPid};
[QPid] ->
case rabbit_misc:is_process_alive(QPid) of
@@ -166,8 +167,8 @@ init_it(Self, Node, QueueName) ->
case rabbit_misc:is_process_alive(SPid) of
true -> existing;
false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{ slave_pids = MPids1 }),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
{new, QPid}
end
end.
@@ -465,8 +466,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
msg_id_ack = MA,
msg_id_status = MS,
known_senders = KS }) ->
- rabbit_event:notify(queue_slave_promoted, [{pid, self()},
- {name, QName}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
[rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
@@ -933,8 +932,17 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
%% unsynchronised: we assert that can never happen.
set_synchronised(true, State = #state { q = #amqqueue { name = QName },
synchronised = false }) ->
- rabbit_event:notify(queue_slave_synchronised, [{pid, self()},
- {name, QName}]),
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2)
+ end
+ end),
State #state { synchronised = true };
set_synchronised(true, State) ->
State;