diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-06-29 13:51:09 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-06-29 13:51:09 +0100 |
commit | 1791a50cfb0a9db025b74c701744e0dd45c91ef7 (patch) | |
tree | d3b64592b4a5418adb1d63fb6d355577a2c6f9c7 | |
parent | a82ee24122224a9d75f7b795d638145f9f6de49b (diff) | |
parent | a6fc80b7caef30f0a157add3aa56aa9a4ff9a8c5 (diff) | |
download | rabbitmq-server-1791a50cfb0a9db025b74c701744e0dd45c91ef7.tar.gz |
Merged bug24826 into default
-rw-r--r-- | ebin/rabbit_app.in | 4 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/Makefile | 1 | ||||
-rw-r--r-- | packaging/common/rabbitmq-script-wrapper | 4 | ||||
-rw-r--r-- | packaging/debs/Debian/Makefile | 1 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rabbitmq-server.init | 14 | ||||
-rwxr-xr-x | scripts/rabbitmq-plugins.bat | 6 | ||||
-rwxr-xr-x | scripts/rabbitmq-server.bat | 4 | ||||
-rwxr-xr-x | scripts/rabbitmq-service.bat | 5 | ||||
-rw-r--r-- | src/gm.erl | 97 | ||||
-rw-r--r-- | src/rabbit.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 5 | ||||
-rw-r--r-- | src/rabbit_disk_monitor.erl | 6 | ||||
-rw-r--r-- | src/rabbit_file.erl | 17 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 8 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 17 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 8 |
18 files changed, 113 insertions, 105 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index b7d14f20..523b54ce 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -19,9 +19,11 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, - {disk_free_limit, {mem_relative, 1.0}}, + {disk_free_limit, 1000000000}, %% 1GB {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, + %% 0 ("no limit") would make a better default, but that + %% breaks the QPid Java client {frame_max, 131072}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 262144}, diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 180500ed..03e513f8 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -42,6 +42,7 @@ ifeq "$(RPM_OS)" "fedora" SOURCES/rabbitmq-server.init endif sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ + -e 's|@STDOUT_STDERR_REDIRECTION@||' \ SOURCES/rabbitmq-script-wrapper cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 0e59c218..e832aed6 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -29,7 +29,9 @@ cd /var/lib/rabbitmq SCRIPT=`basename $0` -if [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then +if [ `id -u` = `id -u rabbitmq` -a "$SCRIPT" = "rabbitmq-server" ] ; then + /usr/lib/rabbitmq/bin/rabbitmq-server "$@" @STDOUT_STDERR_REDIRECTION@ +elif [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then /usr/lib/rabbitmq/bin/${SCRIPT} "$@" elif [ `id -u` = 0 ] ; then @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 844388c6..1e4bf755 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -23,6 +23,7 @@ package: clean cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ + -e 's|@STDOUT_STDERR_REDIRECTION@| > "/var/log/rabbitmq/startup_log" 2> "/var/log/rabbitmq/startup_err"|' \ $(UNPACKED_DIR)/debian/rabbitmq-script-wrapper chmod a+x $(UNPACKED_DIR)/debian/rules echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init index 4bc32515..b2d3f86a 100644 --- a/packaging/debs/Debian/debian/rabbitmq-server.init +++ b/packaging/debs/Debian/debian/rabbitmq-server.init @@ -60,10 +60,7 @@ start_rabbitmq () { set +e RABBITMQ_PID_FILE=$PID_FILE start-stop-daemon --quiet \ --chuid rabbitmq --start --exec $DAEMON \ - --pidfile "$RABBITMQ_PID_FILE" \ - > "${INIT_LOG_DIR}/startup_log" \ - 2> "${INIT_LOG_DIR}/startup_err" \ - 0<&- & + --pidfile "$RABBITMQ_PID_FILE" --background $CONTROL wait $PID_FILE >/dev/null 2>&1 RETVAL=$? set -e @@ -143,6 +140,7 @@ start_stop_end() { 3) log_warning_msg "${DESC} already ${1}" log_end_msg 0 + RETVAL=0 ;; *) log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}" @@ -153,12 +151,12 @@ start_stop_end() { case "$1" in start) - log_daemon_msg "Starting ${DESC}:" $NAME + log_daemon_msg "Starting ${DESC}" $NAME start_rabbitmq start_stop_end "running" ;; stop) - log_daemon_msg "Stopping ${DESC}:" $NAME + log_daemon_msg "Stopping ${DESC}" $NAME stop_rabbitmq start_stop_end "stopped" ;; @@ -171,12 +169,12 @@ case "$1" in log_action_end_msg $RETVAL ;; force-reload|reload|restart) - log_daemon_msg "Restarting ${DESC}:" $NAME + log_daemon_msg "Restarting ${DESC}" $NAME restart_rabbitmq restart_end ;; try-restart) - log_daemon_msg "Restarting ${DESC}:" $NAME + log_daemon_msg "Restarting ${DESC}" $NAME restart_running_rabbitmq restart_end ;; diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index 3c268726..c67a0263 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -43,9 +43,9 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-)
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index b8822739..167f272e 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -86,8 +86,8 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 849bedcf..4758c861 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -154,7 +154,10 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
if "!RABBITMQ_CONFIG_FILE!"=="" (
@@ -558,7 +558,7 @@ handle_call(group_members, _From, reply(not_joined, State); handle_call(group_members, _From, State = #state { view = View }) -> - reply(alive_view_members(View), State); + reply(get_pids(alive_view_members(View)), State); handle_call({add_on_right, _NewMember}, _From, State = #state { members_state = undefined }) -> @@ -647,7 +647,7 @@ handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); -handle_info({'DOWN', MRef, process, _Pid, _Reason}, +handle_info({'DOWN', MRef, process, _Pid, Reason}, State = #state { self = Self, left = Left, right = Right, @@ -661,8 +661,10 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, {_, {Member1, MRef}} -> Member1; _ -> undefined end, - case Member of - undefined -> + case {Member, Reason} of + {undefined, _} -> + noreply(State); + {_, {shutdown, ring_shutdown}} -> noreply(State); _ -> View1 = @@ -876,11 +878,9 @@ flush_broadcast_buffer(State = #state { self = Self, %% View construction and inspection %% --------------------------------------------------------------------------- -needs_view_update(ReqVer, {Ver, _View}) -> - Ver < ReqVer. +needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer. -view_version({Ver, _View}) -> - Ver. +view_version({Ver, _View}) -> Ver. is_member_alive({dead, _Member}) -> false; is_member_alive(_) -> true. @@ -899,17 +899,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). -fetch_view_member(Id, {_Ver, View}) -> - ?DICT:fetch(Id, View). +fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). -find_view_member(Id, {_Ver, View}) -> - ?DICT:find(Id, View). +find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). -blank_view(Ver) -> - {Ver, ?DICT:new()}. +blank_view(Ver) -> {Ver, ?DICT:new()}. -alive_view_members({_Ver, View}) -> - ?DICT:fetch_keys(View). +alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). all_known_members({_Ver, View}) -> ?DICT:fold( @@ -1150,10 +1146,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> end, {Neighbour, maybe_monitor(Neighbour, Self)}. -maybe_monitor(Self, Self) -> - undefined; -maybe_monitor(Other, _Self) -> - erlang:monitor(process, get_pid(Other)). +maybe_monitor( Self, Self) -> undefined; +maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1242,23 +1236,19 @@ find_member_or_blank(Id, MembersState) -> error -> blank_member() end. -erase_member(Id, MembersState) -> - ?DICT:erase(Id, MembersState). +erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. -blank_member_state() -> - ?DICT:new(). +blank_member_state() -> ?DICT:new(). store_member(Id, MemberState, MembersState) -> ?DICT:store(Id, MemberState, MembersState). -prepare_members_state(MembersState) -> - ?DICT:to_list(MembersState). +prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). -build_members_state(MembersStateList) -> - ?DICT:from_list(MembersStateList). +build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). make_member(GroupName) -> {case read_group(GroupName) of @@ -1280,16 +1270,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% Activity assembly %% --------------------------------------------------------------------------- -activity_nil() -> - queue:new(). +activity_nil() -> queue:new(). -activity_cons(_Id, [], [], Tail) -> - Tail; -activity_cons(Sender, Pubs, Acks, Tail) -> - queue:in({Sender, Pubs, Acks}, Tail). +activity_cons( _Id, [], [], Tail) -> Tail; +activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail). -activity_finalise(Activity) -> - queue:to_list(Activity). +activity_finalise(Activity) -> queue:to_list(Activity). maybe_send_activity([], _State) -> ok; @@ -1393,34 +1379,25 @@ purge_confirms(Confirms) -> %% Msg transformation %% --------------------------------------------------------------------------- -acks_from_queue(Q) -> - [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. +acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. -pubs_from_queue(Q) -> - queue:to_list(Q). +pubs_from_queue(Q) -> queue:to_list(Q). -queue_from_pubs(Pubs) -> - queue:from_list(Pubs). +queue_from_pubs(Pubs) -> queue:from_list(Pubs). -apply_acks([], Pubs) -> - Pubs; -apply_acks(List, Pubs) -> - {_, Pubs1} = queue:split(length(List), Pubs), - Pubs1. +apply_acks( [], Pubs) -> Pubs; +apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs), + Pubs1. join_pubs(Q, []) -> Q; join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). -last_ack([], LA) -> - LA; -last_ack(List, LA) -> - LA1 = lists:last(List), - true = LA1 > LA, %% ASSERTION - LA1. - -last_pub([], LP) -> - LP; -last_pub(List, LP) -> - {PubNum, _Msg} = lists:last(List), - true = PubNum > LP, %% ASSERTION - PubNum. +last_ack( [], LA) -> LA; +last_ack(List, LA) -> LA1 = lists:last(List), + true = LA1 > LA, %% ASSERTION + LA1. + +last_pub( [], LP) -> LP; +last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), + true = PubNum > LP, %% ASSERTION + PubNum. diff --git a/src/rabbit.erl b/src/rabbit.erl index fc5d4e93..fda489fe 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -323,8 +323,6 @@ boot() -> start_it(StartFun) -> try StartFun() - catch _:Reason -> - boot_error("Error description:~n~n~p~n~n", [Reason]) after %% give the error loggers some time to catch up timer:sleep(100) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 637a7d4d..afbaea65 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -171,6 +171,9 @@ [queue_name, channel_pid, consumer_tag, ack_required]). start() -> + %% Clear out remnants of old incarnation, in case we restarted + %% faster than other nodes handled DOWN messages from us. + on_node_down(node()), DurableQueues = find_durable_queues(), {ok, BQ} = application:get_env(rabbit, backing_queue_module), ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), @@ -530,7 +533,7 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> Key = {consumer_credit_to, QPid}, @@ -595,7 +598,8 @@ on_node_down(Node) -> #amqqueue{name = QName, pid = Pid, slave_pids = []} <- mnesia:table(rabbit_queue), - node(Pid) == Node])), + node(Pid) == Node andalso + not is_process_alive(Pid)])), {Qs, Dels} = lists:unzip(QsDels), T = rabbit_binding:process_deletions( lists:foldl(fun rabbit_binding:combine_deletions/2, diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 2e163cfb..b23088cc 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -156,6 +156,11 @@ start() -> {'EXIT', {badarg, _}} -> print_error("invalid parameter: ~p", [Args]), usage(); + {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) -> + %% We handle this common case specially to avoid ~p since + %% that has i18n issues + print_error("~s: ~s", [Problem, Reason]), + rabbit_misc:quit(2); {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index d9e8e8e4..58375abb 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -27,7 +27,7 @@ set_check_interval/1, get_disk_free/0]). -define(SERVER, ?MODULE). --define(DEFAULT_DISK_CHECK_INTERVAL, 60000). +-define(DEFAULT_DISK_CHECK_INTERVAL, 10000). -record(state, {dir, limit, @@ -168,8 +168,8 @@ get_disk_free(Dir, {unix, _}) -> parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); get_disk_free(Dir, {win32, _}) -> parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"])); -get_disk_free(_, _) -> - unknown. +get_disk_free(_, Platform) -> + {unknown, Platform}. parse_free_unix(CommandResult) -> [_, Stats | _] = string:tokens(CommandResult, "\n"), diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 59df14f3..a95f8f26 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -102,9 +102,12 @@ read_file_info(File) -> with_fhc_handle(fun () -> prim_file:read_file_info(File) end). with_fhc_handle(Fun) -> - ok = file_handle_cache:obtain(), + with_fhc_handle(1, Fun). + +with_fhc_handle(N, Fun) -> + [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)], try Fun() - after ok = file_handle_cache:release() + after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)] end. read_term_file(File) -> @@ -165,7 +168,7 @@ make_binary(List) -> {error, Reason} end. - +%% TODO the semantics of this function are rather odd. But see bug 25021. append_file(File, Suffix) -> case read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -183,9 +186,11 @@ append_file(File, 0, Suffix) -> end end); append_file(File, _, Suffix) -> - case with_fhc_handle(fun () -> prim_file:read_file(File) end) of - {ok, Data} -> write_file([File, Suffix], Data, [append]); - Error -> Error + case with_fhc_handle(2, fun () -> + file:copy(File, {[File, Suffix], [append]}) + end) of + {ok, _BytesCopied} -> ok; + Error -> Error end. ensure_parent_dirs_exist(Filename) -> diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 17e2ffb4..3e058793 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> noreply(State); handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> - noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). + noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }); + +handle_cast({delete_and_terminate, Reason}, State) -> + {stop, Reason, State}. handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), @@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); +handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> + ok = gen_server2:cast(CPid, Msg), + {stop, {shutdown, ring_shutdown}}; handle_msg([_CPid], _From, _Msg) -> ok. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4e71cc43..750bcd56 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -127,10 +127,21 @@ terminate(Reason, delete_and_terminate(Reason, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> + Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()], + MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), + monitor_wait(MRefs), State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }. +monitor_wait([]) -> + ok; +monitor_wait([MRef | MRefs]) -> + receive({'DOWN', MRef, process, _Pid, _Info}) -> + ok + end, + monitor_wait(MRefs). + purge(State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index e412fbbc..03fafc3e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -351,20 +351,17 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> ok; handle_msg([SPid], _From, {process_death, Pid}) -> inform_deaths(SPid, [Pid]); +handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> + ok = gen_server2:cast(CPid, {gm, Msg}), + {stop, {shutdown, ring_shutdown}}; handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). inform_deaths(SPid, Deaths) -> - rabbit_misc:with_exit_handler( - fun () -> {stop, normal} end, - fun () -> - case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of - ok -> - ok; - {promote, CPid} -> - {become, rabbit_mirror_queue_coordinator, [CPid]} - end - end). + case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of + ok -> ok; + {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} + end. %% --------------------------------------------------------------------------- %% Others diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b773f83b..bd5cf588 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -313,8 +313,8 @@ handle_other(handshake_timeout, _Deb, State) -> throw({handshake_timeout, State#v1.callback}); handle_other(timeout, Deb, State = #v1{connection_state = closed}) -> mainloop(Deb, State); -handle_other(timeout, _Deb, #v1{connection_state = S}) -> - throw({timeout, S}); +handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) -> + throw({heartbeat_timeout, S}); handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -683,7 +683,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), - ReceiveFun = fun() -> Parent ! timeout end, + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, @@ -736,8 +736,6 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -%% Compute frame_max for this instance. Could simply use 0, but breaks -%% QPid Java client. server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. |