diff options
author | Tim Watson <tim.watson@gmail.com> | 2012-06-14 15:23:31 +0100 |
---|---|---|
committer | Tim Watson <tim.watson@gmail.com> | 2012-06-14 15:23:31 +0100 |
commit | 3cfc2be9a9ca06314367eff69f0295b5135e7bb4 (patch) | |
tree | 2c74656b0f5d38c8acdcd9afa6093fd20bbcdf5f | |
parent | 150b908aab7b09843674bc7a6e10eb9a19ea6362 (diff) | |
parent | 0bcd6f7a5d73086353512bccae6564efd40596f5 (diff) | |
download | rabbitmq-server-3cfc2be9a9ca06314367eff69f0295b5135e7bb4.tar.gz |
merge default
-rw-r--r-- | ebin/rabbit_app.in | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rabbitmq-server.init | 15 | ||||
-rwxr-xr-x | scripts/rabbitmq-plugins.bat | 4 | ||||
-rwxr-xr-x | scripts/rabbitmq-server.bat | 5 | ||||
-rw-r--r-- | src/gm.erl | 87 | ||||
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 29 | ||||
-rw-r--r-- | src/rabbit_disk_monitor.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 8 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 47 |
11 files changed, 98 insertions, 110 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index b7d14f20..ffe112a0 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -22,6 +22,8 @@ {disk_free_limit, {mem_relative, 1.0}}, {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, + %% 0 ("no limit") would make a better default, but that + %% breaks the QPid Java client {frame_max, 131072}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 262144}, diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init index f514b974..c1352078 100644 --- a/packaging/debs/Debian/debian/rabbitmq-server.init +++ b/packaging/debs/Debian/debian/rabbitmq-server.init @@ -137,13 +137,18 @@ restart_end() { start_stop_end() { case "$RETVAL" in 0) - log_end_msg 0;; + [ -x /sbin/initctl ] && /sbin/initctl emit --no-wait "${NAME}-${1}" + log_end_msg 0 + ;; 3) log_warning_msg "${DESC} already ${1}" - log_end_msg 0;; + log_end_msg 0 + RETVAL=0 + ;; *) log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}" - log_end_msg 1;; + log_end_msg 1 + ;; esac } @@ -151,7 +156,7 @@ case "$1" in start) log_daemon_msg "Starting ${DESC}" $NAME start_rabbitmq - start_stop_end "started" + start_stop_end "running" ;; stop) log_daemon_msg "Stopping ${DESC}" $NAME @@ -162,7 +167,7 @@ case "$1" in status_rabbitmq ;; rotate-logs) - log_action_begin_msg "Rotating log files for ${DESC} ${NAME}" + log_action_begin_msg "Rotating log files for ${DESC}: ${NAME}" rotate_logs_rabbitmq log_action_end_msg $RETVAL ;; diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index bc198393..3c268726 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -43,7 +43,9 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 09d4661f..b8822739 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -86,7 +86,10 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
"!ERLANG_HOME!\bin\erl.exe" ^
@@ -876,11 +876,9 @@ flush_broadcast_buffer(State = #state { self = Self, %% View construction and inspection %% --------------------------------------------------------------------------- -needs_view_update(ReqVer, {Ver, _View}) -> - Ver < ReqVer. +needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer. -view_version({Ver, _View}) -> - Ver. +view_version({Ver, _View}) -> Ver. is_member_alive({dead, _Member}) -> false; is_member_alive(_) -> true. @@ -899,17 +897,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). -fetch_view_member(Id, {_Ver, View}) -> - ?DICT:fetch(Id, View). +fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). -find_view_member(Id, {_Ver, View}) -> - ?DICT:find(Id, View). +find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). -blank_view(Ver) -> - {Ver, ?DICT:new()}. +blank_view(Ver) -> {Ver, ?DICT:new()}. -alive_view_members({_Ver, View}) -> - ?DICT:fetch_keys(View). +alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). all_known_members({_Ver, View}) -> ?DICT:fold( @@ -1150,10 +1144,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> end, {Neighbour, maybe_monitor(Neighbour, Self)}. -maybe_monitor(Self, Self) -> - undefined; -maybe_monitor(Other, _Self) -> - erlang:monitor(process, get_pid(Other)). +maybe_monitor( Self, Self) -> undefined; +maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1242,23 +1234,19 @@ find_member_or_blank(Id, MembersState) -> error -> blank_member() end. -erase_member(Id, MembersState) -> - ?DICT:erase(Id, MembersState). +erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. -blank_member_state() -> - ?DICT:new(). +blank_member_state() -> ?DICT:new(). store_member(Id, MemberState, MembersState) -> ?DICT:store(Id, MemberState, MembersState). -prepare_members_state(MembersState) -> - ?DICT:to_list(MembersState). +prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). -build_members_state(MembersStateList) -> - ?DICT:from_list(MembersStateList). +build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). make_member(GroupName) -> {case read_group(GroupName) of @@ -1280,16 +1268,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% Activity assembly %% --------------------------------------------------------------------------- -activity_nil() -> - queue:new(). +activity_nil() -> queue:new(). -activity_cons(_Id, [], [], Tail) -> - Tail; -activity_cons(Sender, Pubs, Acks, Tail) -> - queue:in({Sender, Pubs, Acks}, Tail). +activity_cons( _Id, [], [], Tail) -> Tail; +activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail). -activity_finalise(Activity) -> - queue:to_list(Activity). +activity_finalise(Activity) -> queue:to_list(Activity). maybe_send_activity([], _State) -> ok; @@ -1393,34 +1377,25 @@ purge_confirms(Confirms) -> %% Msg transformation %% --------------------------------------------------------------------------- -acks_from_queue(Q) -> - [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. +acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. -pubs_from_queue(Q) -> - queue:to_list(Q). +pubs_from_queue(Q) -> queue:to_list(Q). -queue_from_pubs(Pubs) -> - queue:from_list(Pubs). +queue_from_pubs(Pubs) -> queue:from_list(Pubs). -apply_acks([], Pubs) -> - Pubs; -apply_acks(List, Pubs) -> - {_, Pubs1} = queue:split(length(List), Pubs), - Pubs1. +apply_acks( [], Pubs) -> Pubs; +apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs), + Pubs1. join_pubs(Q, []) -> Q; join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). -last_ack([], LA) -> - LA; -last_ack(List, LA) -> - LA1 = lists:last(List), - true = LA1 > LA, %% ASSERTION - LA1. - -last_pub([], LP) -> - LP; -last_pub(List, LP) -> - {PubNum, _Msg} = lists:last(List), - true = PubNum > LP, %% ASSERTION - PubNum. +last_ack( [], LA) -> LA; +last_ack(List, LA) -> LA1 = lists:last(List), + true = LA1 > LA, %% ASSERTION + LA1. + +last_pub( [], LP) -> LP; +last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), + true = PubNum > LP, %% ASSERTION + PubNum. diff --git a/src/rabbit.erl b/src/rabbit.erl index b7ba7144..fda489fe 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -199,7 +199,8 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, + ssl_connection, ssl_record, gen_fsm, ssl]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -263,7 +264,7 @@ maybe_hipe_compile() -> hipe_compile() -> Count = length(?HIPE_WORTHY), - io:format("HiPE compiling: |~s|~n |", + io:format("~nHiPE compiling: |~s|~n |", [string:copies("-", Count)]), T1 = erlang:now(), PidMRefs = [spawn_monitor(fun () -> [begin diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index f8b8c345..2e163cfb 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -490,12 +490,14 @@ wait_for_process_death(Pid) -> read_pid_file(PidFile, Wait) -> case {file:read_file(PidFile), Wait} of {{ok, Bin}, _} -> - S = re:replace(Bin, "\\s", "", [global, {return, list}]), - try list_to_integer(S) + S = binary_to_list(Bin), + {match, [PidS]} = re:run(S, "[^\\s]+", + [{capture, all, list}]), + try list_to_integer(PidS) catch error:badarg -> exit({error, {garbage_in_pid_file, PidFile}}) end, - S; + PidS; {{error, enoent}, true} -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), read_pid_file(PidFile, Wait); @@ -507,8 +509,7 @@ read_pid_file(PidFile, Wait) -> % rpc:call(os, getpid, []) at this point process_up(Pid) -> with_os([{unix, fun () -> - system("ps -p " ++ Pid - ++ " >/dev/null 2>&1") =:= 0 + run_ps(Pid) =:= 0 end}, {win32, fun () -> Res = os:cmd("tasklist /nh /fi \"pid eq " ++ @@ -526,15 +527,17 @@ with_os(Handlers) -> Handler -> Handler() end. -% Like system(3) -system(Cmd) -> - ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", - Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), - receive {Port, {exit_status, Status}} -> Status end. +run_ps(Pid) -> + Port = erlang:open_port({spawn, "ps -p " ++ Pid}, + [exit_status, {line, 16384}, + use_stdio, stderr_to_stdout]), + exit_loop(Port). -% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" -escape_quotes(Cmd) -> - lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). +exit_loop(Port) -> + receive + {Port, {exit_status, Rc}} -> Rc; + {Port, _} -> exit_loop(Port) + end. format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index d9e8e8e4..ed29bd80 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -168,8 +168,8 @@ get_disk_free(Dir, {unix, _}) -> parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); get_disk_free(Dir, {win32, _}) -> parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"])); -get_disk_free(_, _) -> - unknown. +get_disk_free(_, Platform) -> + {unknown, Platform}. parse_free_unix(CommandResult) -> [_, Stats | _] = string:tokens(CommandResult, "\n"), diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 17e2ffb4..71e0507a 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> noreply(State); handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> - noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). + noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }); + +handle_cast({delete_and_terminate, Reason}, State) -> + {stop, Reason, State}. handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), @@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); +handle_msg([CPid], _From, {delete_and_terminate, Reason} = Msg) -> + ok = gen_server2:cast(CPid, Msg), + {stop, Reason}; handle_msg([_CPid], _From, _Msg) -> ok. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 707e576f..c675bc4e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -737,8 +737,6 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -%% Compute frame_max for this instance. Could simply use 0, but breaks -%% QPid Java client. server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2760ef2d..5545cccf 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -29,6 +29,7 @@ -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). +-define(TIMEOUT, 5000). all_tests() -> passed = gm_tests:all_tests(), @@ -1240,7 +1241,7 @@ test_spawn() -> rabbit_limiter:make_token(self())), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok - after 1000 -> throw(failed_to_receive_channel_open_ok) + after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok) end, {Writer, Ch}. @@ -1261,7 +1262,7 @@ test_spawn_remote() -> end end), receive Res -> Res - after 1000 -> throw(failed_to_receive_result) + after ?TIMEOUT -> throw(failed_to_receive_result) end. user(Username) -> @@ -1281,13 +1282,10 @@ test_confirms() -> queue = Q0, exchange = <<"amq.direct">>, routing_key = "magic" }), - receive #'queue.bind_ok'{} -> - Q0 - after 1000 -> - throw(failed_to_bind_queue) + receive #'queue.bind_ok'{} -> Q0 + after ?TIMEOUT -> throw(failed_to_bind_queue) end - after 1000 -> - throw(failed_to_declare_queue) + after ?TIMEOUT -> throw(failed_to_declare_queue) end end, %% Declare and bind two queues @@ -1300,7 +1298,7 @@ test_confirms() -> rabbit_channel:do(Ch, #'confirm.select'{}), receive #'confirm.select_ok'{} -> ok - after 1000 -> throw(failed_to_enable_confirms) + after ?TIMEOUT -> throw(failed_to_enable_confirms) end, %% Publish a message rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>, @@ -1317,7 +1315,7 @@ test_confirms() -> receive #'basic.nack'{} -> ok; #'basic.ack'{} -> throw(received_ack_instead_of_nack) - after 2000 -> throw(did_not_receive_nack) + after ?TIMEOUT-> throw(did_not_receive_nack) end, receive #'basic.ack'{} -> throw(received_ack_when_none_expected) @@ -1327,7 +1325,7 @@ test_confirms() -> rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}), receive #'queue.delete_ok'{} -> ok - after 1000 -> throw(failed_to_cleanup_queue) + after ?TIMEOUT -> throw(failed_to_cleanup_queue) end, unlink(Ch), ok = rabbit_channel:shutdown(Ch), @@ -1350,7 +1348,7 @@ test_statistics_receive_event1(Ch, Matcher) -> true -> Props; _ -> test_statistics_receive_event1(Ch, Matcher) end - after 1000 -> throw(failed_to_receive_event) + after ?TIMEOUT -> throw(failed_to_receive_event) end. test_statistics() -> @@ -1362,9 +1360,8 @@ test_statistics() -> %% Set up a channel and queue {_Writer, Ch} = test_spawn(), rabbit_channel:do(Ch, #'queue.declare'{}), - QName = receive #'queue.declare_ok'{queue = Q0} -> - Q0 - after 1000 -> throw(failed_to_receive_queue_declare_ok) + QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 + after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), QPid = Q#amqqueue.pid, @@ -1444,7 +1441,7 @@ expect_event(Pid, Type) -> Pid -> ok; _ -> expect_event(Pid, Type) end - after 1000 -> throw({failed_to_receive_event, Type}) + after ?TIMEOUT -> throw({failed_to_receive_event, Type}) end. test_delegates_async(SecondaryNode) -> @@ -1468,7 +1465,7 @@ make_responder(FMsg) -> make_responder(FMsg, timeout). make_responder(FMsg, Throw) -> fun () -> receive Msg -> FMsg(Msg) - after 1000 -> throw(Throw) + after ?TIMEOUT -> throw(Throw) end end. @@ -1481,9 +1478,7 @@ await_response(Count) -> receive response -> ok, await_response(Count - 1) - after 1000 -> - io:format("Async reply not received~n"), - throw(timeout) + after ?TIMEOUT -> throw(timeout) end. must_exit(Fun) -> @@ -1550,7 +1545,7 @@ test_queue_cleanup(_SecondaryNode) -> rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> ok - after 1000 -> throw(failed_to_receive_queue_declare_ok) + after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, rabbit_channel:shutdown(Ch), rabbit:stop(), @@ -1561,8 +1556,7 @@ test_queue_cleanup(_SecondaryNode) -> receive #'channel.close'{reply_code = ?NOT_FOUND} -> ok - after 2000 -> - throw(failed_to_receive_channel_exit) + after ?TIMEOUT -> throw(failed_to_receive_channel_exit) end, rabbit_channel:shutdown(Ch2), passed. @@ -1589,8 +1583,7 @@ test_declare_on_dead_queue(SecondaryNode) -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), {ok, 0} = rabbit_amqqueue:delete(Q, false, false), passed - after 2000 -> - throw(failed_to_create_and_kill_queue) + after ?TIMEOUT -> throw(failed_to_create_and_kill_queue) end. %%--------------------------------------------------------------------- @@ -1821,7 +1814,7 @@ on_disk_capture(OnDisk, Awaiting, Pid) -> Pid); stop -> done - after (case Awaiting of [] -> 200; _ -> 1000 end) -> + after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) -> case Awaiting of [] -> Pid ! {self(), arrived}, on_disk_capture(); _ -> Pid ! {self(), timeout} @@ -2374,7 +2367,7 @@ wait_for_confirms(Unconfirmed) -> wait_for_confirms( rabbit_misc:gb_sets_difference( Unconfirmed, gb_sets:from_list(Confirmed))) - after 5000 -> exit(timeout_waiting_for_confirm) + after ?TIMEOUT -> exit(timeout_waiting_for_confirm) end end. |