summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-11-26 12:30:36 +0000
committerTim Watson <tim@rabbitmq.com>2012-11-26 12:30:36 +0000
commit2a213f043860e20e836de3d735f4d38debc450a2 (patch)
tree872b6d6d5e427aa0b5cfac7f035825cc299bf611
parentee69523f13ef37026922ed973965e5e953827a3a (diff)
parent5818c2ad750183a9c15d06dbe1b392aff388ce93 (diff)
downloadrabbitmq-server-2a213f043860e20e836de3d735f4d38debc450a2.tar.gz
merge bug25309 into stable
-rw-r--r--README2
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl43
-rw-r--r--src/rabbit_mirror_queue_misc.erl16
-rw-r--r--src/rabbit_policy.erl4
5 files changed, 31 insertions, 38 deletions
diff --git a/README b/README
index 90e99e62..67e3a66a 100644
--- a/README
+++ b/README
@@ -1 +1 @@
-Please see http://www.rabbitmq.com/build-server.html for build instructions. \ No newline at end of file
+Please see http://www.rabbitmq.com/build-server.html for build instructions.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8ce1160c..c48aa6dd 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -588,8 +588,8 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring).
-stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
+start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring).
+stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1df05922..f4459e45 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -122,11 +122,10 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
-
State = #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = backing_queue_module(Q),
+ backing_queue = undefined,
backing_queue_state = undefined,
active_consumers = queue:new(),
expires = undefined,
@@ -193,7 +192,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
declare(Recover, From, State = #q{q = Q,
- backing_queue = BQ,
+ backing_queue = undefined,
backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
#amqqueue{} = Q1 ->
@@ -205,9 +204,11 @@ declare(Recover, From, State = #q{q = Q,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
+ BQ = backing_queue_module(Q1),
BQS = bq_init(BQ, Q, Recover),
recovery_barrier(Recover),
- State1 = process_args(State#q{backing_queue_state = BQS}),
+ State1 = process_args(State#q{backing_queue = BQ,
+ backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -1152,23 +1153,6 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
-handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- %% lookup again to get policy for init_with_existing_bq
- {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
- true = BQ =/= rabbit_mirror_queue_master, %% assertion
- BQ1 = rabbit_mirror_queue_master,
- BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
- reply(ok, State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
-handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQ = rabbit_mirror_queue_master, %% assertion
- {BQ1, BQS1} = BQ:stop_mirroring(BQS),
- reply(ok, State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -1298,6 +1282,23 @@ handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
end;
+handle_cast(start_mirroring, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ noreply(State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
+handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ noreply(State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 2b3bd027..58f20476 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -133,11 +133,11 @@ on_node_up() ->
end
end, [], rabbit_queue)
end),
- [{ok, _} = add_mirror(QName, node()) || QName <- QNames],
+ [add_mirror(QName, node()) || QName <- QNames],
ok.
drop_mirrors(QName, Nodes) ->
- [{ok, _} = drop_mirror(QName, Node) || Node <- Nodes],
+ [drop_mirror(QName, Node) || Node <- Nodes],
ok.
drop_mirror(QName, MirrorNode) ->
@@ -159,7 +159,7 @@ drop_mirror(QName, MirrorNode) ->
end).
add_mirrors(QName, Nodes) ->
- [{ok, _} = add_mirror(QName, Node) || Node <- Nodes],
+ [add_mirror(QName, Node) || Node <- Nodes],
ok.
add_mirror(QName, MirrorNode) ->
@@ -183,15 +183,7 @@ start_child(Name, MirrorNode, Q) ->
fun () ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
- {ok, undefined} ->
- %% this means the mirror process was
- %% already running on the given node.
- {ok, already_mirrored};
- {ok, down} ->
- %% Node went down between us deciding to start a mirror
- %% and actually starting it. Which is fine.
- {ok, node_down};
- {ok, SPid} ->
+ {ok, SPid} when is_pid(SPid) ->
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
{ok, started};
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 2717cc92..2c997f16 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -166,8 +166,8 @@ update_policies(VHost) ->
[update_queue(Q, Policies) ||
Q <- rabbit_amqqueue:list(VHost)]}
end),
- [notify(X) || X <- Xs],
- [notify(Q) || Q <- Qs],
+ [catch notify(X) || X <- Xs],
+ [catch notify(Q) || Q <- Qs],
ok.
update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->