summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-06-29 13:51:09 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-06-29 13:51:09 +0100
commit1791a50cfb0a9db025b74c701744e0dd45c91ef7 (patch)
treed3b64592b4a5418adb1d63fb6d355577a2c6f9c7
parenta82ee24122224a9d75f7b795d638145f9f6de49b (diff)
parenta6fc80b7caef30f0a157add3aa56aa9a4ff9a8c5 (diff)
downloadrabbitmq-server-1791a50cfb0a9db025b74c701744e0dd45c91ef7.tar.gz
Merged bug24826 into default
-rw-r--r--ebin/rabbit_app.in4
-rw-r--r--packaging/RPMS/Fedora/Makefile1
-rw-r--r--packaging/common/rabbitmq-script-wrapper4
-rw-r--r--packaging/debs/Debian/Makefile1
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.init14
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rwxr-xr-xscripts/rabbitmq-server.bat4
-rwxr-xr-xscripts/rabbitmq-service.bat5
-rw-r--r--src/gm.erl97
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_control_main.erl5
-rw-r--r--src/rabbit_disk_monitor.erl6
-rw-r--r--src/rabbit_file.erl17
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl8
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mirror_queue_slave.erl17
-rw-r--r--src/rabbit_reader.erl8
18 files changed, 113 insertions, 105 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index b7d14f20..523b54ce 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -19,9 +19,11 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
- {disk_free_limit, {mem_relative, 1.0}},
+ {disk_free_limit, 1000000000}, %% 1GB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
+ %% 0 ("no limit") would make a better default, but that
+ %% breaks the QPid Java client
{frame_max, 131072},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 262144},
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 180500ed..03e513f8 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -42,6 +42,7 @@ ifeq "$(RPM_OS)" "fedora"
SOURCES/rabbitmq-server.init
endif
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ -e 's|@STDOUT_STDERR_REDIRECTION@||' \
SOURCES/rabbitmq-script-wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 0e59c218..e832aed6 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -29,7 +29,9 @@ cd /var/lib/rabbitmq
SCRIPT=`basename $0`
-if [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then
+if [ `id -u` = `id -u rabbitmq` -a "$SCRIPT" = "rabbitmq-server" ] ; then
+ /usr/lib/rabbitmq/bin/rabbitmq-server "$@" @STDOUT_STDERR_REDIRECTION@
+elif [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then
/usr/lib/rabbitmq/bin/${SCRIPT} "$@"
elif [ `id -u` = 0 ] ; then
@SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 844388c6..1e4bf755 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -23,6 +23,7 @@ package: clean
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ -e 's|@STDOUT_STDERR_REDIRECTION@| > "/var/log/rabbitmq/startup_log" 2> "/var/log/rabbitmq/startup_err"|' \
$(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
chmod a+x $(UNPACKED_DIR)/debian/rules
echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init
index 4bc32515..b2d3f86a 100644
--- a/packaging/debs/Debian/debian/rabbitmq-server.init
+++ b/packaging/debs/Debian/debian/rabbitmq-server.init
@@ -60,10 +60,7 @@ start_rabbitmq () {
set +e
RABBITMQ_PID_FILE=$PID_FILE start-stop-daemon --quiet \
--chuid rabbitmq --start --exec $DAEMON \
- --pidfile "$RABBITMQ_PID_FILE" \
- > "${INIT_LOG_DIR}/startup_log" \
- 2> "${INIT_LOG_DIR}/startup_err" \
- 0<&- &
+ --pidfile "$RABBITMQ_PID_FILE" --background
$CONTROL wait $PID_FILE >/dev/null 2>&1
RETVAL=$?
set -e
@@ -143,6 +140,7 @@ start_stop_end() {
3)
log_warning_msg "${DESC} already ${1}"
log_end_msg 0
+ RETVAL=0
;;
*)
log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}"
@@ -153,12 +151,12 @@ start_stop_end() {
case "$1" in
start)
- log_daemon_msg "Starting ${DESC}:" $NAME
+ log_daemon_msg "Starting ${DESC}" $NAME
start_rabbitmq
start_stop_end "running"
;;
stop)
- log_daemon_msg "Stopping ${DESC}:" $NAME
+ log_daemon_msg "Stopping ${DESC}" $NAME
stop_rabbitmq
start_stop_end "stopped"
;;
@@ -171,12 +169,12 @@ case "$1" in
log_action_end_msg $RETVAL
;;
force-reload|reload|restart)
- log_daemon_msg "Restarting ${DESC}:" $NAME
+ log_daemon_msg "Restarting ${DESC}" $NAME
restart_rabbitmq
restart_end
;;
try-restart)
- log_daemon_msg "Restarting ${DESC}:" $NAME
+ log_daemon_msg "Restarting ${DESC}" $NAME
restart_running_rabbitmq
restart_end
;;
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 3c268726..c67a0263 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -43,9 +43,9 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-)
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index b8822739..167f272e 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -86,8 +86,8 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 849bedcf..4758c861 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -154,7 +154,10 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/src/gm.erl b/src/gm.erl
index 97c81ec6..f88ed18f 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -558,7 +558,7 @@ handle_call(group_members, _From,
reply(not_joined, State);
handle_call(group_members, _From, State = #state { view = View }) ->
- reply(alive_view_members(View), State);
+ reply(get_pids(alive_view_members(View)), State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
@@ -647,7 +647,7 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
-handle_info({'DOWN', MRef, process, _Pid, _Reason},
+handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
right = Right,
@@ -661,8 +661,10 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
{_, {Member1, MRef}} -> Member1;
_ -> undefined
end,
- case Member of
- undefined ->
+ case {Member, Reason} of
+ {undefined, _} ->
+ noreply(State);
+ {_, {shutdown, ring_shutdown}} ->
noreply(State);
_ ->
View1 =
@@ -876,11 +878,9 @@ flush_broadcast_buffer(State = #state { self = Self,
%% View construction and inspection
%% ---------------------------------------------------------------------------
-needs_view_update(ReqVer, {Ver, _View}) ->
- Ver < ReqVer.
+needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
-view_version({Ver, _View}) ->
- Ver.
+view_version({Ver, _View}) -> Ver.
is_member_alive({dead, _Member}) -> false;
is_member_alive(_) -> true.
@@ -899,17 +899,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
-fetch_view_member(Id, {_Ver, View}) ->
- ?DICT:fetch(Id, View).
+fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
-find_view_member(Id, {_Ver, View}) ->
- ?DICT:find(Id, View).
+find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
-blank_view(Ver) ->
- {Ver, ?DICT:new()}.
+blank_view(Ver) -> {Ver, ?DICT:new()}.
-alive_view_members({_Ver, View}) ->
- ?DICT:fetch_keys(View).
+alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
all_known_members({_Ver, View}) ->
?DICT:fold(
@@ -1150,10 +1146,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
-maybe_monitor(Self, Self) ->
- undefined;
-maybe_monitor(Other, _Self) ->
- erlang:monitor(process, get_pid(Other)).
+maybe_monitor( Self, Self) -> undefined;
+maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1242,23 +1236,19 @@ find_member_or_blank(Id, MembersState) ->
error -> blank_member()
end.
-erase_member(Id, MembersState) ->
- ?DICT:erase(Id, MembersState).
+erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
-blank_member_state() ->
- ?DICT:new().
+blank_member_state() -> ?DICT:new().
store_member(Id, MemberState, MembersState) ->
?DICT:store(Id, MemberState, MembersState).
-prepare_members_state(MembersState) ->
- ?DICT:to_list(MembersState).
+prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
-build_members_state(MembersStateList) ->
- ?DICT:from_list(MembersStateList).
+build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
make_member(GroupName) ->
{case read_group(GroupName) of
@@ -1280,16 +1270,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% Activity assembly
%% ---------------------------------------------------------------------------
-activity_nil() ->
- queue:new().
+activity_nil() -> queue:new().
-activity_cons(_Id, [], [], Tail) ->
- Tail;
-activity_cons(Sender, Pubs, Acks, Tail) ->
- queue:in({Sender, Pubs, Acks}, Tail).
+activity_cons( _Id, [], [], Tail) -> Tail;
+activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
-activity_finalise(Activity) ->
- queue:to_list(Activity).
+activity_finalise(Activity) -> queue:to_list(Activity).
maybe_send_activity([], _State) ->
ok;
@@ -1393,34 +1379,25 @@ purge_confirms(Confirms) ->
%% Msg transformation
%% ---------------------------------------------------------------------------
-acks_from_queue(Q) ->
- [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
-pubs_from_queue(Q) ->
- queue:to_list(Q).
+pubs_from_queue(Q) -> queue:to_list(Q).
-queue_from_pubs(Pubs) ->
- queue:from_list(Pubs).
+queue_from_pubs(Pubs) -> queue:from_list(Pubs).
-apply_acks([], Pubs) ->
- Pubs;
-apply_acks(List, Pubs) ->
- {_, Pubs1} = queue:split(length(List), Pubs),
- Pubs1.
+apply_acks( [], Pubs) -> Pubs;
+apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
join_pubs(Q, []) -> Q;
join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
-last_ack([], LA) ->
- LA;
-last_ack(List, LA) ->
- LA1 = lists:last(List),
- true = LA1 > LA, %% ASSERTION
- LA1.
-
-last_pub([], LP) ->
- LP;
-last_pub(List, LP) ->
- {PubNum, _Msg} = lists:last(List),
- true = PubNum > LP, %% ASSERTION
- PubNum.
+last_ack( [], LA) -> LA;
+last_ack(List, LA) -> LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub( [], LP) -> LP;
+last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fc5d4e93..fda489fe 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -323,8 +323,6 @@ boot() ->
start_it(StartFun) ->
try
StartFun()
- catch _:Reason ->
- boot_error("Error description:~n~n~p~n~n", [Reason])
after
%% give the error loggers some time to catch up
timer:sleep(100)
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 637a7d4d..afbaea65 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -171,6 +171,9 @@
[queue_name, channel_pid, consumer_tag, ack_required]).
start() ->
+ %% Clear out remnants of old incarnation, in case we restarted
+ %% faster than other nodes handled DOWN messages from us.
+ on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
@@ -530,7 +533,7 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
@@ -595,7 +598,8 @@ on_node_down(Node) ->
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
- node(Pid) == Node])),
+ node(Pid) == Node andalso
+ not is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 2e163cfb..b23088cc 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -156,6 +156,11 @@ start() ->
{'EXIT', {badarg, _}} ->
print_error("invalid parameter: ~p", [Args]),
usage();
+ {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) ->
+ %% We handle this common case specially to avoid ~p since
+ %% that has i18n issues
+ print_error("~s: ~s", [Problem, Reason]),
+ rabbit_misc:quit(2);
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index d9e8e8e4..58375abb 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -27,7 +27,7 @@
set_check_interval/1, get_disk_free/0]).
-define(SERVER, ?MODULE).
--define(DEFAULT_DISK_CHECK_INTERVAL, 60000).
+-define(DEFAULT_DISK_CHECK_INTERVAL, 10000).
-record(state, {dir,
limit,
@@ -168,8 +168,8 @@ get_disk_free(Dir, {unix, _}) ->
parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"]));
-get_disk_free(_, _) ->
- unknown.
+get_disk_free(_, Platform) ->
+ {unknown, Platform}.
parse_free_unix(CommandResult) ->
[_, Stats | _] = string:tokens(CommandResult, "\n"),
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 59df14f3..a95f8f26 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -102,9 +102,12 @@ read_file_info(File) ->
with_fhc_handle(fun () -> prim_file:read_file_info(File) end).
with_fhc_handle(Fun) ->
- ok = file_handle_cache:obtain(),
+ with_fhc_handle(1, Fun).
+
+with_fhc_handle(N, Fun) ->
+ [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)],
try Fun()
- after ok = file_handle_cache:release()
+ after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)]
end.
read_term_file(File) ->
@@ -165,7 +168,7 @@ make_binary(List) ->
{error, Reason}
end.
-
+%% TODO the semantics of this function are rather odd. But see bug 25021.
append_file(File, Suffix) ->
case read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -183,9 +186,11 @@ append_file(File, 0, Suffix) ->
end
end);
append_file(File, _, Suffix) ->
- case with_fhc_handle(fun () -> prim_file:read_file(File) end) of
- {ok, Data} -> write_file([File, Suffix], Data, [append]);
- Error -> Error
+ case with_fhc_handle(2, fun () ->
+ file:copy(File, {[File, Suffix], [append]})
+ end) of
+ {ok, _BytesCopied} -> ok;
+ Error -> Error
end.
ensure_parent_dirs_exist(Filename) ->
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 17e2ffb4..3e058793 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
noreply(State);
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
- noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
+
+handle_cast({delete_and_terminate, Reason}, State) ->
+ {stop, Reason, State}.
handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
@@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, Msg),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([_CPid], _From, _Msg) ->
ok.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4e71cc43..750bcd56 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,10 +127,21 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
+ Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ monitor_wait(MRefs),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
+monitor_wait([]) ->
+ ok;
+monitor_wait([MRef | MRefs]) ->
+ receive({'DOWN', MRef, process, _Pid, _Info}) ->
+ ok
+ end,
+ monitor_wait(MRefs).
+
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e412fbbc..03fafc3e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -351,20 +351,17 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
ok;
handle_msg([SPid], _From, {process_death, Pid}) ->
inform_deaths(SPid, [Pid]);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, {gm, Msg}),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
inform_deaths(SPid, Deaths) ->
- rabbit_misc:with_exit_handler(
- fun () -> {stop, normal} end,
- fun () ->
- case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
- ok ->
- ok;
- {promote, CPid} ->
- {become, rabbit_mirror_queue_coordinator, [CPid]}
- end
- end).
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
%% ---------------------------------------------------------------------------
%% Others
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b773f83b..bd5cf588 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -313,8 +313,8 @@ handle_other(handshake_timeout, _Deb, State) ->
throw({handshake_timeout, State#v1.callback});
handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
mainloop(Deb, State);
-handle_other(timeout, _Deb, #v1{connection_state = S}) ->
- throw({timeout, S});
+handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
+ throw({heartbeat_timeout, S});
handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -683,7 +683,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
- ReceiveFun = fun() -> Parent ! timeout end,
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
@@ -736,8 +736,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-%% Compute frame_max for this instance. Could simply use 0, but breaks
-%% QPid Java client.
server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.