diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-12-03 13:08:41 +0000 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-12-03 13:08:41 +0000 |
commit | afff5b61be9f6b3abdfc093b976a0a4479302c75 (patch) | |
tree | 2a2b6095deaf7bc7e1d0d5d5adb5a954881edf42 | |
parent | 9e552c8457e7188631802d7d226078eda8adc085 (diff) | |
parent | 2a213f043860e20e836de3d735f4d38debc450a2 (diff) | |
download | rabbitmq-server-bug25321.tar.gz |
merge defaultbug25321
-rw-r--r-- | README | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 43 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 16 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 4 |
5 files changed, 31 insertions, 38 deletions
@@ -1 +1 @@ -Please see http://www.rabbitmq.com/build-server.html for build instructions.
\ No newline at end of file +Please see http://www.rabbitmq.com/build-server.html for build instructions. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5aec3bee..ed61e066 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -592,8 +592,8 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). -stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). +start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1df05922..f4459e45 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -122,11 +122,10 @@ info_keys() -> ?INFO_KEYS. init(Q) -> process_flag(trap_exit, true), - State = #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, - backing_queue = backing_queue_module(Q), + backing_queue = undefined, backing_queue_state = undefined, active_consumers = queue:new(), expires = undefined, @@ -193,7 +192,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- declare(Recover, From, State = #q{q = Q, - backing_queue = BQ, + backing_queue = undefined, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of #amqqueue{} = Q1 -> @@ -205,9 +204,11 @@ declare(Recover, From, State = #q{q = Q, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + BQ = backing_queue_module(Q1), BQS = bq_init(BQ, Q, Recover), recovery_barrier(Recover), - State1 = process_args(State#q{backing_queue_state = BQS}), + State1 = process_args(State#q{backing_queue = BQ, + backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -1152,23 +1153,6 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(requeue(AckTags, ChPid, State)); -handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - %% lookup again to get policy for init_with_existing_bq - {ok, Q} = rabbit_amqqueue:lookup(qname(State)), - true = BQ =/= rabbit_mirror_queue_master, %% assertion - BQ1 = rabbit_mirror_queue_master, - BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), - reply(ok, State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - -handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - BQ = rabbit_mirror_queue_master, %% assertion - {BQ1, BQS1} = BQ:stop_mirroring(BQS), - reply(ok, State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), @@ -1298,6 +1282,23 @@ handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) end; +handle_cast(start_mirroring, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + noreply(State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + +handle_cast(stop_mirroring, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + noreply(State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 2b3bd027..58f20476 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -133,11 +133,11 @@ on_node_up() -> end end, [], rabbit_queue) end), - [{ok, _} = add_mirror(QName, node()) || QName <- QNames], + [add_mirror(QName, node()) || QName <- QNames], ok. drop_mirrors(QName, Nodes) -> - [{ok, _} = drop_mirror(QName, Node) || Node <- Nodes], + [drop_mirror(QName, Node) || Node <- Nodes], ok. drop_mirror(QName, MirrorNode) -> @@ -159,7 +159,7 @@ drop_mirror(QName, MirrorNode) -> end). add_mirrors(QName, Nodes) -> - [{ok, _} = add_mirror(QName, Node) || Node <- Nodes], + [add_mirror(QName, Node) || Node <- Nodes], ok. add_mirror(QName, MirrorNode) -> @@ -183,15 +183,7 @@ start_child(Name, MirrorNode, Q) -> fun () -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of - {ok, undefined} -> - %% this means the mirror process was - %% already running on the given node. - {ok, already_mirrored}; - {ok, down} -> - %% Node went down between us deciding to start a mirror - %% and actually starting it. Which is fine. - {ok, node_down}; - {ok, SPid} -> + {ok, SPid} when is_pid(SPid) -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), {ok, started}; diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 2717cc92..2c997f16 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -166,8 +166,8 @@ update_policies(VHost) -> [update_queue(Q, Policies) || Q <- rabbit_amqqueue:list(VHost)]} end), - [notify(X) || X <- Xs], - [notify(Q) || Q <- Qs], + [catch notify(X) || X <- Xs], + [catch notify(Q) || Q <- Qs], ok. update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> |