diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-07-09 12:01:16 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-07-09 12:01:16 +0100 |
commit | 3432b6db0947dbdaab853f980e12aa470154368e (patch) | |
tree | 188feaf4789e3b7fd1c0c6916e92db9e1e8c1d04 | |
parent | 6c7532d5a7bb255f4209f13f6b6246ebd3ce4cf5 (diff) | |
download | rabbitmq-server-3432b6db0947dbdaab853f980e12aa470154368e.tar.gz |
Be more consistent about always cleaning sync_slave_pids whenever slave_pids is updated, abstract a bit, introduce an async wake_up message.
-rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 27 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 |
4 files changed, 27 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 32a33812..05aad4bd 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -22,7 +22,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([force_event_refresh/0]). +-export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). @@ -102,6 +102,7 @@ -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(force_event_refresh/0 :: () -> 'ok'). +-spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -476,6 +477,8 @@ force_event_refresh(QNames) -> force_event_refresh(Failed) end. +wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up). + consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9d03805a..1bf25bb2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1281,7 +1281,10 @@ handle_cast({set_maximum_since_use, Age}, State) -> noreply(State); handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> - dead_letter_msg(Msg, AckTag, Reason, State). + dead_letter_msg(Msg, AckTag, Reason, State); + +handle_cast(wake_up, State) -> + noreply(State). %% We need to not ignore this as we need to remove outstanding %% confirms due to queue death. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index fbffbee1..83ecd4bf 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -18,7 +18,7 @@ -export([remove_from_queue/2, on_node_up/0, drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, - report_deaths/4]). + report_deaths/4, store_updated_slaves/1]). -include("rabbit.hrl"). @@ -37,6 +37,7 @@ -spec(add_mirror/3 :: (rabbit_types:vhost(), binary(), atom()) -> rabbit_types:ok_or_error(any())). +-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -58,9 +59,8 @@ remove_from_queue(QueueName, DeadPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids, - sync_slave_pids = SSPids}] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids}] -> [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], @@ -71,13 +71,9 @@ remove_from_queue(QueueName, DeadPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - SSPids1 = [SSPid || SSPid <- SSPids, - lists:member(SSPid, SPids1)], - Q1 = Q #amqqueue { pid = QPid1, - slave_pids = SPids1, - sync_slave_pids = SSPids1}, - ok = rabbit_amqqueue:store_queue(Q1), - rabbit_amqqueue:info(Q1, [name]), %% Wake it up + store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, @@ -177,3 +173,12 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> end, rabbit_misc:pid_to_string(MirrorPid), [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). + +store_updated_slaves(Q = #amqqueue{slave_pids = SPids, + sync_slave_pids = SSPids}) -> + SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], + Q1 = Q#amqqueue{sync_slave_pids = SSPids1}, + ok = rabbit_amqqueue:store_queue(Q1), + %% Wake it up so that we emit a stats event + rabbit_amqqueue:wake_up(Q1), + Q1. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index a858ee4e..7939b48c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -107,8 +107,8 @@ init(#amqqueue { name = QueueName } = Q) -> mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of [] -> MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), + rabbit_mirror_queue_misc:store_updated_slaves( + Q1 #amqqueue { slave_pids = MPids1 }), {new, QPid}; [SPid] -> true = rabbit_misc:is_process_alive(SPid), existing @@ -914,10 +914,9 @@ set_synchronised(true, State = #state { q = Q = #amqqueue { name = QName }, ok; [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, - ok = rabbit_amqqueue:store_queue(Q2) + rabbit_mirror_queue_misc:store_updated_slaves(Q2) end end), - rabbit_amqqueue:info(Q, [name]), %% Wake it up State #state { synchronised = true }; set_synchronised(true, State) -> State; |