summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim.watson@gmail.com>2012-06-14 15:23:31 +0100
committerTim Watson <tim.watson@gmail.com>2012-06-14 15:23:31 +0100
commit3cfc2be9a9ca06314367eff69f0295b5135e7bb4 (patch)
tree2c74656b0f5d38c8acdcd9afa6093fd20bbcdf5f
parent150b908aab7b09843674bc7a6e10eb9a19ea6362 (diff)
parent0bcd6f7a5d73086353512bccae6564efd40596f5 (diff)
downloadrabbitmq-server-3cfc2be9a9ca06314367eff69f0295b5135e7bb4.tar.gz
merge default
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.init15
-rwxr-xr-xscripts/rabbitmq-plugins.bat4
-rwxr-xr-xscripts/rabbitmq-server.bat5
-rw-r--r--src/gm.erl87
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_control_main.erl29
-rw-r--r--src/rabbit_disk_monitor.erl4
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl8
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_tests.erl47
11 files changed, 98 insertions, 110 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index b7d14f20..ffe112a0 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -22,6 +22,8 @@
{disk_free_limit, {mem_relative, 1.0}},
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
+ %% 0 ("no limit") would make a better default, but that
+ %% breaks the QPid Java client
{frame_max, 131072},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 262144},
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init
index f514b974..c1352078 100644
--- a/packaging/debs/Debian/debian/rabbitmq-server.init
+++ b/packaging/debs/Debian/debian/rabbitmq-server.init
@@ -137,13 +137,18 @@ restart_end() {
start_stop_end() {
case "$RETVAL" in
0)
- log_end_msg 0;;
+ [ -x /sbin/initctl ] && /sbin/initctl emit --no-wait "${NAME}-${1}"
+ log_end_msg 0
+ ;;
3)
log_warning_msg "${DESC} already ${1}"
- log_end_msg 0;;
+ log_end_msg 0
+ RETVAL=0
+ ;;
*)
log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}"
- log_end_msg 1;;
+ log_end_msg 1
+ ;;
esac
}
@@ -151,7 +156,7 @@ case "$1" in
start)
log_daemon_msg "Starting ${DESC}" $NAME
start_rabbitmq
- start_stop_end "started"
+ start_stop_end "running"
;;
stop)
log_daemon_msg "Stopping ${DESC}" $NAME
@@ -162,7 +167,7 @@ case "$1" in
status_rabbitmq
;;
rotate-logs)
- log_action_begin_msg "Rotating log files for ${DESC} ${NAME}"
+ log_action_begin_msg "Rotating log files for ${DESC}: ${NAME}"
rotate_logs_rabbitmq
log_action_end_msg $RETVAL
;;
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index bc198393..3c268726 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -43,7 +43,9 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 09d4661f..b8822739 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -86,7 +86,10 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
"!ERLANG_HOME!\bin\erl.exe" ^
diff --git a/src/gm.erl b/src/gm.erl
index 97c81ec6..30fcdc5d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -876,11 +876,9 @@ flush_broadcast_buffer(State = #state { self = Self,
%% View construction and inspection
%% ---------------------------------------------------------------------------
-needs_view_update(ReqVer, {Ver, _View}) ->
- Ver < ReqVer.
+needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
-view_version({Ver, _View}) ->
- Ver.
+view_version({Ver, _View}) -> Ver.
is_member_alive({dead, _Member}) -> false;
is_member_alive(_) -> true.
@@ -899,17 +897,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
-fetch_view_member(Id, {_Ver, View}) ->
- ?DICT:fetch(Id, View).
+fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
-find_view_member(Id, {_Ver, View}) ->
- ?DICT:find(Id, View).
+find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
-blank_view(Ver) ->
- {Ver, ?DICT:new()}.
+blank_view(Ver) -> {Ver, ?DICT:new()}.
-alive_view_members({_Ver, View}) ->
- ?DICT:fetch_keys(View).
+alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
all_known_members({_Ver, View}) ->
?DICT:fold(
@@ -1150,10 +1144,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
-maybe_monitor(Self, Self) ->
- undefined;
-maybe_monitor(Other, _Self) ->
- erlang:monitor(process, get_pid(Other)).
+maybe_monitor( Self, Self) -> undefined;
+maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1242,23 +1234,19 @@ find_member_or_blank(Id, MembersState) ->
error -> blank_member()
end.
-erase_member(Id, MembersState) ->
- ?DICT:erase(Id, MembersState).
+erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
-blank_member_state() ->
- ?DICT:new().
+blank_member_state() -> ?DICT:new().
store_member(Id, MemberState, MembersState) ->
?DICT:store(Id, MemberState, MembersState).
-prepare_members_state(MembersState) ->
- ?DICT:to_list(MembersState).
+prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
-build_members_state(MembersStateList) ->
- ?DICT:from_list(MembersStateList).
+build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
make_member(GroupName) ->
{case read_group(GroupName) of
@@ -1280,16 +1268,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% Activity assembly
%% ---------------------------------------------------------------------------
-activity_nil() ->
- queue:new().
+activity_nil() -> queue:new().
-activity_cons(_Id, [], [], Tail) ->
- Tail;
-activity_cons(Sender, Pubs, Acks, Tail) ->
- queue:in({Sender, Pubs, Acks}, Tail).
+activity_cons( _Id, [], [], Tail) -> Tail;
+activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
-activity_finalise(Activity) ->
- queue:to_list(Activity).
+activity_finalise(Activity) -> queue:to_list(Activity).
maybe_send_activity([], _State) ->
ok;
@@ -1393,34 +1377,25 @@ purge_confirms(Confirms) ->
%% Msg transformation
%% ---------------------------------------------------------------------------
-acks_from_queue(Q) ->
- [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
-pubs_from_queue(Q) ->
- queue:to_list(Q).
+pubs_from_queue(Q) -> queue:to_list(Q).
-queue_from_pubs(Pubs) ->
- queue:from_list(Pubs).
+queue_from_pubs(Pubs) -> queue:from_list(Pubs).
-apply_acks([], Pubs) ->
- Pubs;
-apply_acks(List, Pubs) ->
- {_, Pubs1} = queue:split(length(List), Pubs),
- Pubs1.
+apply_acks( [], Pubs) -> Pubs;
+apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
join_pubs(Q, []) -> Q;
join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
-last_ack([], LA) ->
- LA;
-last_ack(List, LA) ->
- LA1 = lists:last(List),
- true = LA1 > LA, %% ASSERTION
- LA1.
-
-last_pub([], LP) ->
- LP;
-last_pub(List, LP) ->
- {PubNum, _Msg} = lists:last(List),
- true = PubNum > LP, %% ASSERTION
- PubNum.
+last_ack( [], LA) -> LA;
+last_ack(List, LA) -> LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub( [], LP) -> LP;
+last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b7ba7144..fda489fe 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -199,7 +199,8 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon,
+ ssl_connection, ssl_record, gen_fsm, ssl]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -263,7 +264,7 @@ maybe_hipe_compile() ->
hipe_compile() ->
Count = length(?HIPE_WORTHY),
- io:format("HiPE compiling: |~s|~n |",
+ io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
T1 = erlang:now(),
PidMRefs = [spawn_monitor(fun () -> [begin
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index f8b8c345..2e163cfb 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -490,12 +490,14 @@ wait_for_process_death(Pid) ->
read_pid_file(PidFile, Wait) ->
case {file:read_file(PidFile), Wait} of
{{ok, Bin}, _} ->
- S = re:replace(Bin, "\\s", "", [global, {return, list}]),
- try list_to_integer(S)
+ S = binary_to_list(Bin),
+ {match, [PidS]} = re:run(S, "[^\\s]+",
+ [{capture, all, list}]),
+ try list_to_integer(PidS)
catch error:badarg ->
exit({error, {garbage_in_pid_file, PidFile}})
end,
- S;
+ PidS;
{{error, enoent}, true} ->
timer:sleep(?EXTERNAL_CHECK_INTERVAL),
read_pid_file(PidFile, Wait);
@@ -507,8 +509,7 @@ read_pid_file(PidFile, Wait) ->
% rpc:call(os, getpid, []) at this point
process_up(Pid) ->
with_os([{unix, fun () ->
- system("ps -p " ++ Pid
- ++ " >/dev/null 2>&1") =:= 0
+ run_ps(Pid) =:= 0
end},
{win32, fun () ->
Res = os:cmd("tasklist /nh /fi \"pid eq " ++
@@ -526,15 +527,17 @@ with_os(Handlers) ->
Handler -> Handler()
end.
-% Like system(3)
-system(Cmd) ->
- ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'",
- Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]),
- receive {Port, {exit_status, Status}} -> Status end.
+run_ps(Pid) ->
+ Port = erlang:open_port({spawn, "ps -p " ++ Pid},
+ [exit_status, {line, 16384},
+ use_stdio, stderr_to_stdout]),
+ exit_loop(Port).
-% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'"
-escape_quotes(Cmd) ->
- lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+exit_loop(Port) ->
+ receive
+ {Port, {exit_status, Rc}} -> Rc;
+ {Port, _} -> exit_loop(Port)
+ end.
format_parse_error({_Line, Mod, Err}) ->
lists:flatten(Mod:format_error(Err)).
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index d9e8e8e4..ed29bd80 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -168,8 +168,8 @@ get_disk_free(Dir, {unix, _}) ->
parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"]));
-get_disk_free(_, _) ->
- unknown.
+get_disk_free(_, Platform) ->
+ {unknown, Platform}.
parse_free_unix(CommandResult) ->
[_, Stats | _] = string:tokens(CommandResult, "\n"),
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 17e2ffb4..71e0507a 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
noreply(State);
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
- noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
+
+handle_cast({delete_and_terminate, Reason}, State) ->
+ {stop, Reason, State}.
handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
@@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
+handle_msg([CPid], _From, {delete_and_terminate, Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, Msg),
+ {stop, Reason};
handle_msg([_CPid], _From, _Msg) ->
ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 707e576f..c675bc4e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -737,8 +737,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-%% Compute frame_max for this instance. Could simply use 0, but breaks
-%% QPid Java client.
server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2760ef2d..5545cccf 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -29,6 +29,7 @@
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
+-define(TIMEOUT, 5000).
all_tests() ->
passed = gm_tests:all_tests(),
@@ -1240,7 +1241,7 @@ test_spawn() ->
rabbit_limiter:make_token(self())),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_channel_open_ok)
+ after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
end,
{Writer, Ch}.
@@ -1261,7 +1262,7 @@ test_spawn_remote() ->
end
end),
receive Res -> Res
- after 1000 -> throw(failed_to_receive_result)
+ after ?TIMEOUT -> throw(failed_to_receive_result)
end.
user(Username) ->
@@ -1281,13 +1282,10 @@ test_confirms() ->
queue = Q0,
exchange = <<"amq.direct">>,
routing_key = "magic" }),
- receive #'queue.bind_ok'{} ->
- Q0
- after 1000 ->
- throw(failed_to_bind_queue)
+ receive #'queue.bind_ok'{} -> Q0
+ after ?TIMEOUT -> throw(failed_to_bind_queue)
end
- after 1000 ->
- throw(failed_to_declare_queue)
+ after ?TIMEOUT -> throw(failed_to_declare_queue)
end
end,
%% Declare and bind two queues
@@ -1300,7 +1298,7 @@ test_confirms() ->
rabbit_channel:do(Ch, #'confirm.select'{}),
receive
#'confirm.select_ok'{} -> ok
- after 1000 -> throw(failed_to_enable_confirms)
+ after ?TIMEOUT -> throw(failed_to_enable_confirms)
end,
%% Publish a message
rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
@@ -1317,7 +1315,7 @@ test_confirms() ->
receive
#'basic.nack'{} -> ok;
#'basic.ack'{} -> throw(received_ack_instead_of_nack)
- after 2000 -> throw(did_not_receive_nack)
+ after ?TIMEOUT-> throw(did_not_receive_nack)
end,
receive
#'basic.ack'{} -> throw(received_ack_when_none_expected)
@@ -1327,7 +1325,7 @@ test_confirms() ->
rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}),
receive
#'queue.delete_ok'{} -> ok
- after 1000 -> throw(failed_to_cleanup_queue)
+ after ?TIMEOUT -> throw(failed_to_cleanup_queue)
end,
unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
@@ -1350,7 +1348,7 @@ test_statistics_receive_event1(Ch, Matcher) ->
true -> Props;
_ -> test_statistics_receive_event1(Ch, Matcher)
end
- after 1000 -> throw(failed_to_receive_event)
+ after ?TIMEOUT -> throw(failed_to_receive_event)
end.
test_statistics() ->
@@ -1362,9 +1360,8 @@ test_statistics() ->
%% Set up a channel and queue
{_Writer, Ch} = test_spawn(),
rabbit_channel:do(Ch, #'queue.declare'{}),
- QName = receive #'queue.declare_ok'{queue = Q0} ->
- Q0
- after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
QPid = Q#amqqueue.pid,
@@ -1444,7 +1441,7 @@ expect_event(Pid, Type) ->
Pid -> ok;
_ -> expect_event(Pid, Type)
end
- after 1000 -> throw({failed_to_receive_event, Type})
+ after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
test_delegates_async(SecondaryNode) ->
@@ -1468,7 +1465,7 @@ make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
fun () ->
receive Msg -> FMsg(Msg)
- after 1000 -> throw(Throw)
+ after ?TIMEOUT -> throw(Throw)
end
end.
@@ -1481,9 +1478,7 @@ await_response(Count) ->
receive
response -> ok,
await_response(Count - 1)
- after 1000 ->
- io:format("Async reply not received~n"),
- throw(timeout)
+ after ?TIMEOUT -> throw(timeout)
end.
must_exit(Fun) ->
@@ -1550,7 +1545,7 @@ test_queue_cleanup(_SecondaryNode) ->
rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
ok
- after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
rabbit_channel:shutdown(Ch),
rabbit:stop(),
@@ -1561,8 +1556,7 @@ test_queue_cleanup(_SecondaryNode) ->
receive
#'channel.close'{reply_code = ?NOT_FOUND} ->
ok
- after 2000 ->
- throw(failed_to_receive_channel_exit)
+ after ?TIMEOUT -> throw(failed_to_receive_channel_exit)
end,
rabbit_channel:shutdown(Ch2),
passed.
@@ -1589,8 +1583,7 @@ test_declare_on_dead_queue(SecondaryNode) ->
true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
{ok, 0} = rabbit_amqqueue:delete(Q, false, false),
passed
- after 2000 ->
- throw(failed_to_create_and_kill_queue)
+ after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end.
%%---------------------------------------------------------------------
@@ -1821,7 +1814,7 @@ on_disk_capture(OnDisk, Awaiting, Pid) ->
Pid);
stop ->
done
- after (case Awaiting of [] -> 200; _ -> 1000 end) ->
+ after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) ->
case Awaiting of
[] -> Pid ! {self(), arrived}, on_disk_capture();
_ -> Pid ! {self(), timeout}
@@ -2374,7 +2367,7 @@ wait_for_confirms(Unconfirmed) ->
wait_for_confirms(
rabbit_misc:gb_sets_difference(
Unconfirmed, gb_sets:from_list(Confirmed)))
- after 5000 -> exit(timeout_waiting_for_confirm)
+ after ?TIMEOUT -> exit(timeout_waiting_for_confirm)
end
end.