summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-07-09 12:01:16 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-07-09 12:01:16 +0100
commit3432b6db0947dbdaab853f980e12aa470154368e (patch)
tree188feaf4789e3b7fd1c0c6916e92db9e1e8c1d04
parent6c7532d5a7bb255f4209f13f6b6246ebd3ce4cf5 (diff)
downloadrabbitmq-server-3432b6db0947dbdaab853f980e12aa470154368e.tar.gz
Be more consistent about always cleaning sync_slave_pids whenever slave_pids is updated, abstract a bit, introduce an async wake_up message.
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_mirror_queue_misc.erl27
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
4 files changed, 27 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 32a33812..05aad4bd 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -22,7 +22,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([force_event_refresh/0]).
+-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
@@ -102,6 +102,7 @@
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
-spec(force_event_refresh/0 :: () -> 'ok').
+-spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean()}]).
@@ -476,6 +477,8 @@ force_event_refresh(QNames) ->
force_event_refresh(Failed)
end.
+wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
+
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9d03805a..1bf25bb2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1281,7 +1281,10 @@ handle_cast({set_maximum_since_use, Age}, State) ->
noreply(State);
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
- dead_letter_msg(Msg, AckTag, Reason, State).
+ dead_letter_msg(Msg, AckTag, Reason, State);
+
+handle_cast(wake_up, State) ->
+ noreply(State).
%% We need to not ignore this as we need to remove outstanding
%% confirms due to queue death.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index fbffbee1..83ecd4bf 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -18,7 +18,7 @@
-export([remove_from_queue/2, on_node_up/0,
drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
- report_deaths/4]).
+ report_deaths/4, store_updated_slaves/1]).
-include("rabbit.hrl").
@@ -37,6 +37,7 @@
-spec(add_mirror/3 ::
(rabbit_types:vhost(), binary(), atom())
-> rabbit_types:ok_or_error(any())).
+-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -58,9 +59,8 @@ remove_from_queue(QueueName, DeadPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids,
- sync_slave_pids = SSPids}] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids}] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
not lists:member(node(Pid), DeadNodes)],
@@ -71,13 +71,9 @@ remove_from_queue(QueueName, DeadPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- SSPids1 = [SSPid || SSPid <- SSPids,
- lists:member(SSPid, SPids1)],
- Q1 = Q #amqqueue { pid = QPid1,
- slave_pids = SPids1,
- sync_slave_pids = SSPids1},
- ok = rabbit_amqqueue:store_queue(Q1),
- rabbit_amqqueue:info(Q1, [name]), %% Wake it up
+ store_updated_slaves(
+ Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1 }),
{ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
@@ -177,3 +173,12 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
end,
rabbit_misc:pid_to_string(MirrorPid),
[[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]).
+
+store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
+ sync_slave_pids = SSPids}) ->
+ SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
+ ok = rabbit_amqqueue:store_queue(Q1),
+ %% Wake it up so that we emit a stats event
+ rabbit_amqqueue:wake_up(Q1),
+ Q1.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index a858ee4e..7939b48c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -107,8 +107,8 @@ init(#amqqueue { name = QueueName } = Q) ->
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
[] -> MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1 #amqqueue { slave_pids = MPids1 }),
{new, QPid};
[SPid] -> true = rabbit_misc:is_process_alive(SPid),
existing
@@ -914,10 +914,9 @@ set_synchronised(true, State = #state { q = Q = #amqqueue { name = QName },
ok;
[Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
- ok = rabbit_amqqueue:store_queue(Q2)
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2)
end
end),
- rabbit_amqqueue:info(Q, [name]), %% Wake it up
State #state { synchronised = true };
set_synchronised(true, State) ->
State;