diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-09-11 16:54:19 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-09-11 16:54:19 +0100 |
commit | 7998299ddb26c5c8766a4eb1017c8b1776f7957c (patch) | |
tree | 1090b1a0a20e7cd04e6269d186a16b8e253cc25f | |
parent | 75bd64e77ba5aca38859971156404faa49409972 (diff) | |
parent | ecfd2b2f60ad2e5802a5d533234ab28b13cc7bd3 (diff) | |
download | rabbitmq-server-7998299ddb26c5c8766a4eb1017c8b1776f7957c.tar.gz |
Merge in defaultbug25276
-rw-r--r-- | include/gm_specs.hrl | 9 | ||||
-rw-r--r-- | src/gm.erl | 12 | ||||
-rw-r--r-- | src/gm_soak_test.erl | 4 | ||||
-rw-r--r-- | src/gm_speed_test.erl | 4 | ||||
-rw-r--r-- | src/gm_tests.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 18 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 2 |
12 files changed, 53 insertions, 48 deletions
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl index 81555c46..dc51f50e 100644 --- a/include/gm_specs.hrl +++ b/include/gm_specs.hrl @@ -20,9 +20,10 @@ -type(args() :: any()). -type(members() :: [pid()]). --spec(joined/2 :: (args(), members()) -> callback_result()). --spec(members_changed/3 :: (args(), members(), members()) -> callback_result()). --spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()). --spec(terminate/2 :: (args(), term()) -> any()). +-spec(joined/2 :: (args(), members()) -> callback_result()). +-spec(members_changed/4 :: (args(), members(), + members(), members()) -> callback_result()). +-spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()). +-spec(terminate/2 :: (args(), term()) -> any()). -endif. @@ -475,7 +475,7 @@ %% see members die that we have not seen born (or supplied in the %% members to joined/2). -callback members_changed(Args :: term(), Births :: [pid()], - Deaths :: [pid()]) -> + Deaths :: [pid()], Live :: [pid()]) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. %% Supplied with Args provided in start_link, the sender, and the @@ -494,7 +494,7 @@ -else. behaviour_info(callbacks) -> - [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}]; + [{joined, 2}, {members_changed, 4}, {handle_msg, 3}, {terminate, 2}]; behaviour_info(_Other) -> undefined. @@ -678,7 +678,8 @@ handle_cast({validate_members, OldMembers}, Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> noreply(State); - _ -> Result = Module:members_changed(Args, Births, Deaths), + _ -> Result = Module:members_changed( + Args, Births, Deaths, NewMembers), handle_callback_result({Result, State}) end; @@ -1377,8 +1378,9 @@ callback_view_changed(Args, Module, OldView, NewView) -> Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> ok; - _ -> Module:members_changed(Args, get_pids(Births), - get_pids(Deaths)) + _ -> Module:members_changed( + Args, get_pids(Births), get_pids(Deaths), + get_pids(NewMembers)) end. handle_callback_result({Result, State}) -> diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index b379d218..701cb0f7 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -17,7 +17,7 @@ -module(gm_soak_test). -export([test/0]). --export([joined/2, members_changed/3, handle_msg/3, terminate/2]). +-export([joined/2, members_changed/4, handle_msg/3, terminate/2]). -behaviour(gm). @@ -51,7 +51,7 @@ joined([], Members) -> put(ts, now()), ok. -members_changed([], Births, Deaths) -> +members_changed([], Births, Deaths, _Live) -> with_state( fun (State) -> State1 = diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl index 768cc462..0f65a792 100644 --- a/src/gm_speed_test.erl +++ b/src/gm_speed_test.erl @@ -17,7 +17,7 @@ -module(gm_speed_test). -export([test/3]). --export([joined/2, members_changed/3, handle_msg/3, terminate/2]). +-export([joined/2, members_changed/4, handle_msg/3, terminate/2]). -export([wile_e_coyote/2]). -behaviour(gm). @@ -30,7 +30,7 @@ joined(Owner, _Members) -> Owner ! joined, ok. -members_changed(_Owner, _Births, _Deaths) -> +members_changed(_Owner, _Births, _Deaths, _Live) -> ok. handle_msg(Owner, _From, ping) -> diff --git a/src/gm_tests.erl b/src/gm_tests.erl index 233702ad..9a348076 100644 --- a/src/gm_tests.erl +++ b/src/gm_tests.erl @@ -22,7 +22,7 @@ test_member_death/0, test_receive_in_order/0, all_tests/0]). --export([joined/2, members_changed/3, handle_msg/3, terminate/2]). +-export([joined/2, members_changed/4, handle_msg/3, terminate/2]). -behaviour(gm). @@ -40,7 +40,7 @@ joined(Pid, Members) -> Pid ! {joined, self(), Members}, ok. -members_changed(Pid, Births, Deaths) -> +members_changed(Pid, Births, Deaths, _Live) -> Pid ! {members_changed, self(), Births, Deaths}, ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 38d72479..65e42383 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -72,8 +72,8 @@ -spec(declare/5 :: (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) - -> {'new' | 'existing' | 'absent', rabbit_types:amqqueue()} | - rabbit_types:channel_exit()). + -> {'new' | 'existing' | 'absent' | 'owner_died', + rabbit_types:amqqueue()} | rabbit_types:channel_exit()). -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9f864d28..58b93f03 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1104,6 +1104,10 @@ handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); +%% You used to be able to declare an exclusive durable queue. Sadly we +%% need to still tidy up after that case, there could be the remnants +%% of one left over from an upgrade. So that's why we don't enforce +%% Recover = false here. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> case rabbit_misc:is_process_alive(Owner) of @@ -1111,14 +1115,8 @@ handle_call({init, Recover}, From, declare(Recover, From, State); false -> #q{backing_queue = undefined, backing_queue_state = undefined, - q = #amqqueue{name = QName} = Q} = State, - gen_server2:reply(From, not_found), - case Recover of - new -> rabbit_log:warning( - "exclusive owner for ~s went away~n", - [rabbit_misc:rs(QName)]); - _ -> ok %% [1] - end, + q = Q} = State, + gen_server2:reply(From, {owner_died, Q}), BQ = backing_queue_module(Q), BQS = bq_init(BQ, Q, Recover), %% Rely on terminate to delete the queue. @@ -1126,10 +1124,6 @@ handle_call({init, Recover}, From, State#q{backing_queue = BQ, backing_queue_state = BQS}} end; -%% [1] You used to be able to declare an exclusive durable queue. Sadly we -%% need to still tidy up after that case, there could be the remnants of one -%% left over from an upgrade. - handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d59992cc..dc37959b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1023,7 +1023,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% declare. Loop around again. handle_method(Declare, none, State); {absent, Q} -> - rabbit_misc:absent(Q) + rabbit_misc:absent(Q); + {owner_died, _Q} -> + %% Presumably our own days are numbered since the + %% connection has died. Pretend the queue exists though, + %% just so nothing fails. + return_queue_declare_ok(QueueName, NoWait, 0, 0, State) end; {error, {absent, Q}} -> rabbit_misc:absent(Q) diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index f54e9bd1..a0e8bcc6 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -21,7 +21,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([joined/2, members_changed/3, handle_msg/3]). +-export([joined/2, members_changed/4, handle_msg/3]). -behaviour(gen_server2). -behaviour(gm). @@ -347,10 +347,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> handle_call(get_gm, _From, State = #state { gm = GM }) -> reply(GM, State). -handle_cast({gm_deaths, Deaths}, +handle_cast({gm_deaths, LiveGMPids}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> - case rabbit_mirror_queue_misc:remove_from_queue(QueueName, MPid, Deaths) of + case rabbit_mirror_queue_misc:remove_from_queue( + QueueName, MPid, LiveGMPids) of {ok, MPid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), @@ -399,10 +400,10 @@ joined([CPid], Members) -> CPid ! {joined, self(), Members}, ok. -members_changed([_CPid], _Births, []) -> +members_changed([_CPid], _Births, [], _Live) -> ok; -members_changed([CPid], _Births, Deaths) -> - ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). +members_changed([CPid], _Births, _Deaths, Live) -> + ok = gen_server2:cast(CPid, {gm_deaths, Live}). handle_msg([CPid], _From, request_depth = Msg) -> ok = gen_server2:cast(CPid, Msg); diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index a5a1d922..80bcf54f 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -69,7 +69,7 @@ %% slave (now master) receives messages it's not ready for (for %% example, new consumers). %% Returns {ok, NewMPid, DeadPids} -remove_from_queue(QueueName, Self, DeadGMPids) -> +remove_from_queue(QueueName, Self, LiveGMPids) -> rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -79,9 +79,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] -> - {Dead, GMPids1} = lists:partition( + {GMPids1, Dead} = lists:partition( fun ({GM, _}) -> - lists:member(GM, DeadGMPids) + lists:member(GM, LiveGMPids) end, GMPids), DeadPids = [Pid || {_GM, Pid} <- Dead], Alive = [QPid | SPids] -- DeadPids, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 18f848c3..d508f0e9 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -30,7 +30,7 @@ code_change/3, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). --export([joined/2, members_changed/3, handle_msg/3]). +-export([joined/2, members_changed/4, handle_msg/3]). -behaviour(gen_server2). -behaviour(gm). @@ -168,10 +168,10 @@ handle_call({deliver, Delivery, true}, From, State) -> gen_server2:reply(From, ok), noreply(maybe_enqueue_message(Delivery, State)); -handle_call({gm_deaths, Deaths}, From, +handle_call({gm_deaths, LiveGMPids}, From, State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) -> Self = self(), - case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of + case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; @@ -324,7 +324,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; - {gm_deaths, _Deaths} -> 5; + {gm_deaths, _Live} -> 5; _ -> 0 end. @@ -352,8 +352,10 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). joined([SPid], _Members) -> SPid ! {joined, self()}, ok. -members_changed([_SPid], _Births, []) -> ok; -members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths). +members_changed([_SPid], _Births, [], _Live) -> + ok; +members_changed([ SPid], _Births, _Deaths, Live) -> + inform_deaths(SPid, Live). handle_msg([_SPid], _From, request_depth) -> %% This is only of value to the master @@ -381,8 +383,8 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) -> handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). -inform_deaths(SPid, Deaths) -> - case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of +inform_deaths(SPid, Live) -> + case gen_server2:call(SPid, {gm_deaths, Live}, infinity) of ok -> ok; {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} end. diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index fcf77bf9..8d013d43 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -122,7 +122,7 @@ with(VHostPath, Thunk) -> end. %% Like with/2 but outside an Mnesia tx -assert(VHostPath) -> case rabbit_vhost:exists(VHostPath) of +assert(VHostPath) -> case exists(VHostPath) of true -> ok; false -> throw({error, {no_such_vhost, VHostPath}}) end. |