summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-07-27 15:24:06 +0100
committerTim Watson <tim@rabbitmq.com>2012-07-27 15:24:06 +0100
commitbed6e91bbd2af7263690f94581517f2a3214a540 (patch)
tree43a311e2039c81fd984e044a3d0110b8f8094ace
parent4006546389f82cb24296089da6e3fd601bda339e (diff)
downloadrabbitmq-server-bed6e91bbd2af7263690f94581517f2a3214a540.tar.gz
Backport 20073c3da8c3 (Merge of bug24988; Repeated declare / delete of the same HA queue blows up)
-rw-r--r--src/gm.erl10
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mirror_queue_slave.erl17
4 files changed, 26 insertions, 16 deletions
diff --git a/src/gm.erl b/src/gm.erl
index eb93e4c4..6e6aa852 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -562,7 +562,7 @@ handle_call(group_members, _From,
reply(not_joined, State);
handle_call(group_members, _From, State = #state { view = View }) ->
- reply(alive_view_members(View), State);
+ reply(get_pids(alive_view_members(View)), State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
@@ -651,7 +651,7 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
-handle_info({'DOWN', MRef, process, _Pid, _Reason},
+handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
right = Right,
@@ -665,8 +665,10 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
{_, {Member1, MRef}} -> Member1;
_ -> undefined
end,
- case Member of
- undefined ->
+ case {Member, Reason} of
+ {undefined, _} ->
+ noreply(State);
+ {_, {shutdown, ring_shutdown}} ->
noreply(State);
_ ->
View1 =
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 71e0507a..3e058793 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -405,9 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
-handle_msg([CPid], _From, {delete_and_terminate, Reason} = Msg) ->
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, Msg),
- {stop, Reason};
+ {stop, {shutdown, ring_shutdown}};
handle_msg([_CPid], _From, _Msg) ->
ok.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4e71cc43..750bcd56 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,10 +127,21 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
+ Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ monitor_wait(MRefs),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
+monitor_wait([]) ->
+ ok;
+monitor_wait([MRef | MRefs]) ->
+ receive({'DOWN', MRef, process, _Pid, _Info}) ->
+ ok
+ end,
+ monitor_wait(MRefs).
+
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e412fbbc..03fafc3e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -351,20 +351,17 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
ok;
handle_msg([SPid], _From, {process_death, Pid}) ->
inform_deaths(SPid, [Pid]);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, {gm, Msg}),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
inform_deaths(SPid, Deaths) ->
- rabbit_misc:with_exit_handler(
- fun () -> {stop, normal} end,
- fun () ->
- case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
- ok ->
- ok;
- {promote, CPid} ->
- {become, rabbit_mirror_queue_coordinator, [CPid]}
- end
- end).
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
%% ---------------------------------------------------------------------------
%% Others