diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-14 15:41:16 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-14 15:41:16 +0100 |
commit | a4e68f31bded4a6bd2e0a397bfece3c0a7700879 (patch) | |
tree | 22c0c87031e79768ff710f499c0a04be987e02c4 | |
parent | 4d2109231f30d60cc41ee3ee512fca3ed7b8e734 (diff) | |
parent | 9f0212fe7467e309c0cd59815be9a59a49455422 (diff) | |
download | rabbitmq-server-a4e68f31bded4a6bd2e0a397bfece3c0a7700879.tar.gz |
merge bug24930
-rw-r--r-- | ebin/rabbit_app.in | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rabbitmq-server.init | 9 | ||||
-rw-r--r-- | src/gm.erl | 87 | ||||
-rw-r--r-- | src/rabbit.erl | 2 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 29 | ||||
-rw-r--r-- | src/rabbit_disk_monitor.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 8 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 2 |
8 files changed, 63 insertions, 80 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index b7d14f20..ffe112a0 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -22,6 +22,8 @@ {disk_free_limit, {mem_relative, 1.0}}, {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, + %% 0 ("no limit") would make a better default, but that + %% breaks the QPid Java client {frame_max, 131072}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 262144}, diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init index d14eebf2..b2d3f86a 100644 --- a/packaging/debs/Debian/debian/rabbitmq-server.init +++ b/packaging/debs/Debian/debian/rabbitmq-server.init @@ -140,6 +140,7 @@ start_stop_end() { 3) log_warning_msg "${DESC} already ${1}" log_end_msg 0 + RETVAL=0 ;; *) log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}" @@ -150,12 +151,12 @@ start_stop_end() { case "$1" in start) - log_daemon_msg "Starting ${DESC}:" $NAME + log_daemon_msg "Starting ${DESC}" $NAME start_rabbitmq start_stop_end "running" ;; stop) - log_daemon_msg "Stopping ${DESC}:" $NAME + log_daemon_msg "Stopping ${DESC}" $NAME stop_rabbitmq start_stop_end "stopped" ;; @@ -168,12 +169,12 @@ case "$1" in log_action_end_msg $RETVAL ;; force-reload|reload|restart) - log_daemon_msg "Restarting ${DESC}:" $NAME + log_daemon_msg "Restarting ${DESC}" $NAME restart_rabbitmq restart_end ;; try-restart) - log_daemon_msg "Restarting ${DESC}:" $NAME + log_daemon_msg "Restarting ${DESC}" $NAME restart_running_rabbitmq restart_end ;; @@ -876,11 +876,9 @@ flush_broadcast_buffer(State = #state { self = Self, %% View construction and inspection %% --------------------------------------------------------------------------- -needs_view_update(ReqVer, {Ver, _View}) -> - Ver < ReqVer. +needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer. -view_version({Ver, _View}) -> - Ver. +view_version({Ver, _View}) -> Ver. is_member_alive({dead, _Member}) -> false; is_member_alive(_) -> true. @@ -899,17 +897,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). -fetch_view_member(Id, {_Ver, View}) -> - ?DICT:fetch(Id, View). +fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). -find_view_member(Id, {_Ver, View}) -> - ?DICT:find(Id, View). +find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). -blank_view(Ver) -> - {Ver, ?DICT:new()}. +blank_view(Ver) -> {Ver, ?DICT:new()}. -alive_view_members({_Ver, View}) -> - ?DICT:fetch_keys(View). +alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). all_known_members({_Ver, View}) -> ?DICT:fold( @@ -1150,10 +1144,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> end, {Neighbour, maybe_monitor(Neighbour, Self)}. -maybe_monitor(Self, Self) -> - undefined; -maybe_monitor(Other, _Self) -> - erlang:monitor(process, get_pid(Other)). +maybe_monitor( Self, Self) -> undefined; +maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1242,23 +1234,19 @@ find_member_or_blank(Id, MembersState) -> error -> blank_member() end. -erase_member(Id, MembersState) -> - ?DICT:erase(Id, MembersState). +erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. -blank_member_state() -> - ?DICT:new(). +blank_member_state() -> ?DICT:new(). store_member(Id, MemberState, MembersState) -> ?DICT:store(Id, MemberState, MembersState). -prepare_members_state(MembersState) -> - ?DICT:to_list(MembersState). +prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). -build_members_state(MembersStateList) -> - ?DICT:from_list(MembersStateList). +build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). make_member(GroupName) -> {case read_group(GroupName) of @@ -1280,16 +1268,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% Activity assembly %% --------------------------------------------------------------------------- -activity_nil() -> - queue:new(). +activity_nil() -> queue:new(). -activity_cons(_Id, [], [], Tail) -> - Tail; -activity_cons(Sender, Pubs, Acks, Tail) -> - queue:in({Sender, Pubs, Acks}, Tail). +activity_cons( _Id, [], [], Tail) -> Tail; +activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail). -activity_finalise(Activity) -> - queue:to_list(Activity). +activity_finalise(Activity) -> queue:to_list(Activity). maybe_send_activity([], _State) -> ok; @@ -1393,34 +1377,25 @@ purge_confirms(Confirms) -> %% Msg transformation %% --------------------------------------------------------------------------- -acks_from_queue(Q) -> - [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. +acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. -pubs_from_queue(Q) -> - queue:to_list(Q). +pubs_from_queue(Q) -> queue:to_list(Q). -queue_from_pubs(Pubs) -> - queue:from_list(Pubs). +queue_from_pubs(Pubs) -> queue:from_list(Pubs). -apply_acks([], Pubs) -> - Pubs; -apply_acks(List, Pubs) -> - {_, Pubs1} = queue:split(length(List), Pubs), - Pubs1. +apply_acks( [], Pubs) -> Pubs; +apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs), + Pubs1. join_pubs(Q, []) -> Q; join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). -last_ack([], LA) -> - LA; -last_ack(List, LA) -> - LA1 = lists:last(List), - true = LA1 > LA, %% ASSERTION - LA1. - -last_pub([], LP) -> - LP; -last_pub(List, LP) -> - {PubNum, _Msg} = lists:last(List), - true = PubNum > LP, %% ASSERTION - PubNum. +last_ack( [], LA) -> LA; +last_ack(List, LA) -> LA1 = lists:last(List), + true = LA1 > LA, %% ASSERTION + LA1. + +last_pub( [], LP) -> LP; +last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), + true = PubNum > LP, %% ASSERTION + PubNum. diff --git a/src/rabbit.erl b/src/rabbit.erl index fc5d4e93..fda489fe 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -323,8 +323,6 @@ boot() -> start_it(StartFun) -> try StartFun() - catch _:Reason -> - boot_error("Error description:~n~n~p~n~n", [Reason]) after %% give the error loggers some time to catch up timer:sleep(100) diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 4724555b..2e163cfb 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -490,12 +490,14 @@ wait_for_process_death(Pid) -> read_pid_file(PidFile, Wait) -> case {file:read_file(PidFile), Wait} of {{ok, Bin}, _} -> - S = string:strip(binary_to_list(Bin), right, $\n), - try list_to_integer(S) + S = binary_to_list(Bin), + {match, [PidS]} = re:run(S, "[^\\s]+", + [{capture, all, list}]), + try list_to_integer(PidS) catch error:badarg -> exit({error, {garbage_in_pid_file, PidFile}}) end, - S; + PidS; {{error, enoent}, true} -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), read_pid_file(PidFile, Wait); @@ -507,8 +509,7 @@ read_pid_file(PidFile, Wait) -> % rpc:call(os, getpid, []) at this point process_up(Pid) -> with_os([{unix, fun () -> - system("ps -p " ++ Pid - ++ " >/dev/null 2>&1") =:= 0 + run_ps(Pid) =:= 0 end}, {win32, fun () -> Res = os:cmd("tasklist /nh /fi \"pid eq " ++ @@ -526,15 +527,17 @@ with_os(Handlers) -> Handler -> Handler() end. -% Like system(3) -system(Cmd) -> - ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", - Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), - receive {Port, {exit_status, Status}} -> Status end. +run_ps(Pid) -> + Port = erlang:open_port({spawn, "ps -p " ++ Pid}, + [exit_status, {line, 16384}, + use_stdio, stderr_to_stdout]), + exit_loop(Port). -% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" -escape_quotes(Cmd) -> - lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). +exit_loop(Port) -> + receive + {Port, {exit_status, Rc}} -> Rc; + {Port, _} -> exit_loop(Port) + end. format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index d9e8e8e4..ed29bd80 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -168,8 +168,8 @@ get_disk_free(Dir, {unix, _}) -> parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); get_disk_free(Dir, {win32, _}) -> parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"])); -get_disk_free(_, _) -> - unknown. +get_disk_free(_, Platform) -> + {unknown, Platform}. parse_free_unix(CommandResult) -> [_, Stats | _] = string:tokens(CommandResult, "\n"), diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 17e2ffb4..71e0507a 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> noreply(State); handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> - noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). + noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }); + +handle_cast({delete_and_terminate, Reason}, State) -> + {stop, Reason, State}. handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), @@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); +handle_msg([CPid], _From, {delete_and_terminate, Reason} = Msg) -> + ok = gen_server2:cast(CPid, Msg), + {stop, Reason}; handle_msg([_CPid], _From, _Msg) -> ok. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b773f83b..07b39d8c 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -736,8 +736,6 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -%% Compute frame_max for this instance. Could simply use 0, but breaks -%% QPid Java client. server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. |