diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 22 |
1 files changed, 15 insertions, 7 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c4ae307c..e4d78c45 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -155,7 +155,8 @@ init_it(Self, Node, QueueName) -> case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of [] -> MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue(Q1#amqqueue{slave_pids=MPids1}), + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of @@ -166,8 +167,8 @@ init_it(Self, Node, QueueName) -> case rabbit_misc:is_process_alive(SPid) of true -> existing; false -> MPids1 = (MPids -- [SPid]) ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1#amqqueue{ slave_pids = MPids1 }), + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), {new, QPid} end end. @@ -465,8 +466,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> - rabbit_event:notify(queue_slave_promoted, [{pid, self()}, - {name, QName}]), rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, @@ -933,8 +932,17 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, %% unsynchronised: we assert that can never happen. set_synchronised(true, State = #state { q = #amqqueue { name = QName }, synchronised = false }) -> - rabbit_event:notify(queue_slave_synchronised, [{pid, self()}, - {name, QName}]), + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> + Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, + rabbit_mirror_queue_misc:store_updated_slaves(Q2) + end + end), State #state { synchronised = true }; set_synchronised(true, State) -> State; |