summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-10-01 15:37:09 +0100
committerTim Watson <tim@rabbitmq.com>2013-10-01 15:37:09 +0100
commit29619c4c809b2db875de4a77ce3acd0432b625aa (patch)
tree3c18936ad714d8900e81a45c9d32fccd3a8d00fc
parent3d9ed7cd491a087fa63048111169d10b81576843 (diff)
parentbb5ef5c737a10a2661eb4eea3d3ecff7efa681df (diff)
downloadrabbitmq-server-29619c4c809b2db875de4a77ce3acd0432b625aa.tar.gz
merge bug25685 into stable
-rw-r--r--include/gm_specs.hrl9
-rw-r--r--src/gm.erl12
-rw-r--r--src/gm_soak_test.erl4
-rw-r--r--src/gm_speed_test.erl4
-rw-r--r--src/gm_tests.erl4
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_connection_sup.erl5
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl13
-rw-r--r--src/rabbit_mirror_queue_misc.erl6
-rw-r--r--src/rabbit_mirror_queue_slave.erl18
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_vhost.erl2
14 files changed, 53 insertions, 47 deletions
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index 81555c46..dc51f50e 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -20,9 +20,10 @@
-type(args() :: any()).
-type(members() :: [pid()]).
--spec(joined/2 :: (args(), members()) -> callback_result()).
--spec(members_changed/3 :: (args(), members(), members()) -> callback_result()).
--spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()).
--spec(terminate/2 :: (args(), term()) -> any()).
+-spec(joined/2 :: (args(), members()) -> callback_result()).
+-spec(members_changed/4 :: (args(), members(),
+ members(), members()) -> callback_result()).
+-spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()).
+-spec(terminate/2 :: (args(), term()) -> any()).
-endif.
diff --git a/src/gm.erl b/src/gm.erl
index 78099499..098d84fa 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -475,7 +475,7 @@
%% see members die that we have not seen born (or supplied in the
%% members to joined/2).
-callback members_changed(Args :: term(), Births :: [pid()],
- Deaths :: [pid()]) ->
+ Deaths :: [pid()], Live :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
%% Supplied with Args provided in start_link, the sender, and the
@@ -494,7 +494,7 @@
-else.
behaviour_info(callbacks) ->
- [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}];
+ [{joined, 2}, {members_changed, 4}, {handle_msg, 3}, {terminate, 2}];
behaviour_info(_Other) ->
undefined.
@@ -678,7 +678,8 @@ handle_cast({validate_members, OldMembers},
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> noreply(State);
- _ -> Result = Module:members_changed(Args, Births, Deaths),
+ _ -> Result = Module:members_changed(
+ Args, Births, Deaths, NewMembers),
handle_callback_result({Result, State})
end;
@@ -1377,8 +1378,9 @@ callback_view_changed(Args, Module, OldView, NewView) ->
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> ok;
- _ -> Module:members_changed(Args, get_pids(Births),
- get_pids(Deaths))
+ _ -> Module:members_changed(
+ Args, get_pids(Births), get_pids(Deaths),
+ get_pids(NewMembers))
end.
handle_callback_result({Result, State}) ->
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index b379d218..701cb0f7 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -17,7 +17,7 @@
-module(gm_soak_test).
-export([test/0]).
--export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/4, handle_msg/3, terminate/2]).
-behaviour(gm).
@@ -51,7 +51,7 @@ joined([], Members) ->
put(ts, now()),
ok.
-members_changed([], Births, Deaths) ->
+members_changed([], Births, Deaths, _Live) ->
with_state(
fun (State) ->
State1 =
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index 768cc462..0f65a792 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -17,7 +17,7 @@
-module(gm_speed_test).
-export([test/3]).
--export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/4, handle_msg/3, terminate/2]).
-export([wile_e_coyote/2]).
-behaviour(gm).
@@ -30,7 +30,7 @@ joined(Owner, _Members) ->
Owner ! joined,
ok.
-members_changed(_Owner, _Births, _Deaths) ->
+members_changed(_Owner, _Births, _Deaths, _Live) ->
ok.
handle_msg(Owner, _From, ping) ->
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index 233702ad..9a348076 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -22,7 +22,7 @@
test_member_death/0,
test_receive_in_order/0,
all_tests/0]).
--export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/4, handle_msg/3, terminate/2]).
-behaviour(gm).
@@ -40,7 +40,7 @@ joined(Pid, Members) ->
Pid ! {joined, self(), Members},
ok.
-members_changed(Pid, Births, Deaths) ->
+members_changed(Pid, Births, Deaths, _Live) ->
Pid ! {members_changed, self(), Births, Deaths},
ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a1efaf65..32feac30 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -72,8 +72,8 @@
-spec(declare/5 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
- -> {'new' | 'existing' | 'absent', rabbit_types:amqqueue()} |
- rabbit_types:channel_exit()).
+ -> {'new' | 'existing' | 'absent' | 'owner_died',
+ rabbit_types:amqqueue()} | rabbit_types:channel_exit()).
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
-> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e61cba02..add75d89 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1069,14 +1069,8 @@ handle_call({init, Recover}, From,
declare(Recover, From, State);
false -> #q{backing_queue = undefined,
backing_queue_state = undefined,
- q = #amqqueue{name = QName} = Q} = State,
- gen_server2:reply(From, not_found),
- case Recover of
- new -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n",
- [rabbit_misc:rs(QName)]);
- _ -> ok
- end,
+ q = Q} = State,
+ gen_server2:reply(From, {owner_died, Q}),
BQ = backing_queue_module(Q),
BQS = bq_init(BQ, Q, Recover),
%% Rely on terminate to delete the queue.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d6c1e8c0..6c04f4cd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1021,7 +1021,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% declare. Loop around again.
handle_method(Declare, none, State);
{absent, Q} ->
- rabbit_misc:absent(Q)
+ rabbit_misc:absent(Q);
+ {owner_died, _Q} ->
+ %% Presumably our own days are numbered since the
+ %% connection has died. Pretend the queue exists though,
+ %% just so nothing fails.
+ return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
end;
{error, {absent, Q}} ->
rabbit_misc:absent(Q)
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index fee377e7..c1fa17aa 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,8 +42,9 @@ start_link() ->
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
- %% We need to get channels in the hierarchy here so they close
- %% before the reader. But for 1.0 readers we can't start the real
+ %% We need to get channels in the hierarchy here so they get shut
+ %% down after the reader, so the reader gets a chance to terminate
+ %% them cleanly. But for 1.0 readers we can't start the real
%% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) -
%% so we add another supervisor into the hierarchy.
{ok, ChannelSup3Pid} =
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index c9918fed..ab466c3c 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -21,7 +21,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([joined/2, members_changed/3, handle_msg/3]).
+-export([joined/2, members_changed/4, handle_msg/3]).
-behaviour(gen_server2).
-behaviour(gm).
@@ -348,10 +348,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
handle_call(get_gm, _From, State = #state { gm = GM }) ->
reply(GM, State).
-handle_cast({gm_deaths, Deaths},
+handle_cast({gm_deaths, LiveGMPids},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, MPid, Deaths) of
+ case rabbit_mirror_queue_misc:remove_from_queue(
+ QueueName, MPid, LiveGMPids) of
{ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
@@ -400,10 +401,10 @@ joined([CPid], Members) ->
CPid ! {joined, self(), Members},
ok.
-members_changed([_CPid], _Births, []) ->
+members_changed([_CPid], _Births, [], _Live) ->
ok;
-members_changed([CPid], _Births, Deaths) ->
- ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
+members_changed([CPid], _Births, _Deaths, Live) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Live}).
handle_msg([CPid], _From, request_depth = Msg) ->
ok = gen_server2:cast(CPid, Msg);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 335b7c81..4ea1d984 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -69,7 +69,7 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-remove_from_queue(QueueName, Self, DeadGMPids) ->
+remove_from_queue(QueueName, Self, LiveGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -79,9 +79,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
gm_pids = GMPids }] ->
- {Dead, GMPids1} = lists:partition(
+ {GMPids1, Dead} = lists:partition(
fun ({GM, _}) ->
- lists:member(GM, DeadGMPids)
+ lists:member(GM, LiveGMPids)
end, GMPids),
DeadPids = [Pid || {_GM, Pid} <- Dead],
Alive = [QPid | SPids] -- DeadPids,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ec57fdc7..a5d1f68e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -30,7 +30,7 @@
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
--export([joined/2, members_changed/3, handle_msg/3]).
+-export([joined/2, members_changed/4, handle_msg/3]).
-behaviour(gen_server2).
-behaviour(gm).
@@ -182,10 +182,10 @@ handle_call({deliver, Delivery, true}, From, State) ->
gen_server2:reply(From, ok),
noreply(maybe_enqueue_message(Delivery, State));
-handle_call({gm_deaths, Deaths}, From,
+handle_call({gm_deaths, LiveGMPids}, From,
State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
Self = self(),
- case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
@@ -337,7 +337,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
- {gm_deaths, _Deaths} -> 5;
+ {gm_deaths, _Live} -> 5;
_ -> 0
end.
@@ -365,8 +365,10 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, []) -> ok;
-members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
+members_changed([_SPid], _Births, [], _Live) ->
+ ok;
+members_changed([ SPid], _Births, _Deaths, Live) ->
+ inform_deaths(SPid, Live).
handle_msg([_SPid], _From, request_depth) ->
%% This is only of value to the master
@@ -394,8 +396,8 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
-inform_deaths(SPid, Deaths) ->
- case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+inform_deaths(SPid, Live) ->
+ case gen_server2:call(SPid, {gm_deaths, Live}, infinity) of
ok -> ok;
{promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 5fa29b7e..d058ac36 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -518,7 +518,7 @@ copy_db(Destination) ->
rabbit_file:recursive_copy(dir(), Destination).
force_load_filename() ->
- filename:join(rabbit_mnesia:dir(), "force_load").
+ filename:join(dir(), "force_load").
force_load_next_boot() ->
rabbit_file:write_file(force_load_filename(), <<"">>).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index fcf77bf9..8d013d43 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -122,7 +122,7 @@ with(VHostPath, Thunk) ->
end.
%% Like with/2 but outside an Mnesia tx
-assert(VHostPath) -> case rabbit_vhost:exists(VHostPath) of
+assert(VHostPath) -> case exists(VHostPath) of
true -> ok;
false -> throw({error, {no_such_vhost, VHostPath}})
end.