summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-06-14 15:41:16 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-06-14 15:41:16 +0100
commita4e68f31bded4a6bd2e0a397bfece3c0a7700879 (patch)
tree22c0c87031e79768ff710f499c0a04be987e02c4
parent4d2109231f30d60cc41ee3ee512fca3ed7b8e734 (diff)
parent9f0212fe7467e309c0cd59815be9a59a49455422 (diff)
downloadrabbitmq-server-a4e68f31bded4a6bd2e0a397bfece3c0a7700879.tar.gz
merge bug24930
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.init9
-rw-r--r--src/gm.erl87
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_control_main.erl29
-rw-r--r--src/rabbit_disk_monitor.erl4
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl8
-rw-r--r--src/rabbit_reader.erl2
8 files changed, 63 insertions, 80 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index b7d14f20..ffe112a0 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -22,6 +22,8 @@
{disk_free_limit, {mem_relative, 1.0}},
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
+ %% 0 ("no limit") would make a better default, but that
+ %% breaks the QPid Java client
{frame_max, 131072},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 262144},
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init
index d14eebf2..b2d3f86a 100644
--- a/packaging/debs/Debian/debian/rabbitmq-server.init
+++ b/packaging/debs/Debian/debian/rabbitmq-server.init
@@ -140,6 +140,7 @@ start_stop_end() {
3)
log_warning_msg "${DESC} already ${1}"
log_end_msg 0
+ RETVAL=0
;;
*)
log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}"
@@ -150,12 +151,12 @@ start_stop_end() {
case "$1" in
start)
- log_daemon_msg "Starting ${DESC}:" $NAME
+ log_daemon_msg "Starting ${DESC}" $NAME
start_rabbitmq
start_stop_end "running"
;;
stop)
- log_daemon_msg "Stopping ${DESC}:" $NAME
+ log_daemon_msg "Stopping ${DESC}" $NAME
stop_rabbitmq
start_stop_end "stopped"
;;
@@ -168,12 +169,12 @@ case "$1" in
log_action_end_msg $RETVAL
;;
force-reload|reload|restart)
- log_daemon_msg "Restarting ${DESC}:" $NAME
+ log_daemon_msg "Restarting ${DESC}" $NAME
restart_rabbitmq
restart_end
;;
try-restart)
- log_daemon_msg "Restarting ${DESC}:" $NAME
+ log_daemon_msg "Restarting ${DESC}" $NAME
restart_running_rabbitmq
restart_end
;;
diff --git a/src/gm.erl b/src/gm.erl
index 97c81ec6..30fcdc5d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -876,11 +876,9 @@ flush_broadcast_buffer(State = #state { self = Self,
%% View construction and inspection
%% ---------------------------------------------------------------------------
-needs_view_update(ReqVer, {Ver, _View}) ->
- Ver < ReqVer.
+needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
-view_version({Ver, _View}) ->
- Ver.
+view_version({Ver, _View}) -> Ver.
is_member_alive({dead, _Member}) -> false;
is_member_alive(_) -> true.
@@ -899,17 +897,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
-fetch_view_member(Id, {_Ver, View}) ->
- ?DICT:fetch(Id, View).
+fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
-find_view_member(Id, {_Ver, View}) ->
- ?DICT:find(Id, View).
+find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
-blank_view(Ver) ->
- {Ver, ?DICT:new()}.
+blank_view(Ver) -> {Ver, ?DICT:new()}.
-alive_view_members({_Ver, View}) ->
- ?DICT:fetch_keys(View).
+alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
all_known_members({_Ver, View}) ->
?DICT:fold(
@@ -1150,10 +1144,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
-maybe_monitor(Self, Self) ->
- undefined;
-maybe_monitor(Other, _Self) ->
- erlang:monitor(process, get_pid(Other)).
+maybe_monitor( Self, Self) -> undefined;
+maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1242,23 +1234,19 @@ find_member_or_blank(Id, MembersState) ->
error -> blank_member()
end.
-erase_member(Id, MembersState) ->
- ?DICT:erase(Id, MembersState).
+erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
-blank_member_state() ->
- ?DICT:new().
+blank_member_state() -> ?DICT:new().
store_member(Id, MemberState, MembersState) ->
?DICT:store(Id, MemberState, MembersState).
-prepare_members_state(MembersState) ->
- ?DICT:to_list(MembersState).
+prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
-build_members_state(MembersStateList) ->
- ?DICT:from_list(MembersStateList).
+build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
make_member(GroupName) ->
{case read_group(GroupName) of
@@ -1280,16 +1268,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% Activity assembly
%% ---------------------------------------------------------------------------
-activity_nil() ->
- queue:new().
+activity_nil() -> queue:new().
-activity_cons(_Id, [], [], Tail) ->
- Tail;
-activity_cons(Sender, Pubs, Acks, Tail) ->
- queue:in({Sender, Pubs, Acks}, Tail).
+activity_cons( _Id, [], [], Tail) -> Tail;
+activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
-activity_finalise(Activity) ->
- queue:to_list(Activity).
+activity_finalise(Activity) -> queue:to_list(Activity).
maybe_send_activity([], _State) ->
ok;
@@ -1393,34 +1377,25 @@ purge_confirms(Confirms) ->
%% Msg transformation
%% ---------------------------------------------------------------------------
-acks_from_queue(Q) ->
- [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
-pubs_from_queue(Q) ->
- queue:to_list(Q).
+pubs_from_queue(Q) -> queue:to_list(Q).
-queue_from_pubs(Pubs) ->
- queue:from_list(Pubs).
+queue_from_pubs(Pubs) -> queue:from_list(Pubs).
-apply_acks([], Pubs) ->
- Pubs;
-apply_acks(List, Pubs) ->
- {_, Pubs1} = queue:split(length(List), Pubs),
- Pubs1.
+apply_acks( [], Pubs) -> Pubs;
+apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
join_pubs(Q, []) -> Q;
join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
-last_ack([], LA) ->
- LA;
-last_ack(List, LA) ->
- LA1 = lists:last(List),
- true = LA1 > LA, %% ASSERTION
- LA1.
-
-last_pub([], LP) ->
- LP;
-last_pub(List, LP) ->
- {PubNum, _Msg} = lists:last(List),
- true = PubNum > LP, %% ASSERTION
- PubNum.
+last_ack( [], LA) -> LA;
+last_ack(List, LA) -> LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub( [], LP) -> LP;
+last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fc5d4e93..fda489fe 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -323,8 +323,6 @@ boot() ->
start_it(StartFun) ->
try
StartFun()
- catch _:Reason ->
- boot_error("Error description:~n~n~p~n~n", [Reason])
after
%% give the error loggers some time to catch up
timer:sleep(100)
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 4724555b..2e163cfb 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -490,12 +490,14 @@ wait_for_process_death(Pid) ->
read_pid_file(PidFile, Wait) ->
case {file:read_file(PidFile), Wait} of
{{ok, Bin}, _} ->
- S = string:strip(binary_to_list(Bin), right, $\n),
- try list_to_integer(S)
+ S = binary_to_list(Bin),
+ {match, [PidS]} = re:run(S, "[^\\s]+",
+ [{capture, all, list}]),
+ try list_to_integer(PidS)
catch error:badarg ->
exit({error, {garbage_in_pid_file, PidFile}})
end,
- S;
+ PidS;
{{error, enoent}, true} ->
timer:sleep(?EXTERNAL_CHECK_INTERVAL),
read_pid_file(PidFile, Wait);
@@ -507,8 +509,7 @@ read_pid_file(PidFile, Wait) ->
% rpc:call(os, getpid, []) at this point
process_up(Pid) ->
with_os([{unix, fun () ->
- system("ps -p " ++ Pid
- ++ " >/dev/null 2>&1") =:= 0
+ run_ps(Pid) =:= 0
end},
{win32, fun () ->
Res = os:cmd("tasklist /nh /fi \"pid eq " ++
@@ -526,15 +527,17 @@ with_os(Handlers) ->
Handler -> Handler()
end.
-% Like system(3)
-system(Cmd) ->
- ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'",
- Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]),
- receive {Port, {exit_status, Status}} -> Status end.
+run_ps(Pid) ->
+ Port = erlang:open_port({spawn, "ps -p " ++ Pid},
+ [exit_status, {line, 16384},
+ use_stdio, stderr_to_stdout]),
+ exit_loop(Port).
-% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'"
-escape_quotes(Cmd) ->
- lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+exit_loop(Port) ->
+ receive
+ {Port, {exit_status, Rc}} -> Rc;
+ {Port, _} -> exit_loop(Port)
+ end.
format_parse_error({_Line, Mod, Err}) ->
lists:flatten(Mod:format_error(Err)).
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index d9e8e8e4..ed29bd80 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -168,8 +168,8 @@ get_disk_free(Dir, {unix, _}) ->
parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"]));
-get_disk_free(_, _) ->
- unknown.
+get_disk_free(_, Platform) ->
+ {unknown, Platform}.
parse_free_unix(CommandResult) ->
[_, Stats | _] = string:tokens(CommandResult, "\n"),
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 17e2ffb4..71e0507a 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
noreply(State);
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
- noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
+
+handle_cast({delete_and_terminate, Reason}, State) ->
+ {stop, Reason, State}.
handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
@@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
+handle_msg([CPid], _From, {delete_and_terminate, Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, Msg),
+ {stop, Reason};
handle_msg([_CPid], _From, _Msg) ->
ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b773f83b..07b39d8c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -736,8 +736,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-%% Compute frame_max for this instance. Could simply use 0, but breaks
-%% QPid Java client.
server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.