summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl12
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl62
-rw-r--r--src/rabbit_channel.erl14
-rw-r--r--src/rabbit_event.erl16
-rw-r--r--src/rabbit_msg_store.erl31
-rw-r--r--src/rabbit_reader.erl12
-rw-r--r--src/supervisor2.erl29
8 files changed, 84 insertions, 99 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 61b08d49..235e14c0 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -925,10 +925,10 @@ handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
{noreply, process_pending(
update_counts(obtain, ToPid, +1,
- update_counts(obtain, FromPid, -1, State)))};
+ update_counts(obtain, FromPid, -1, State)))}.
-handle_cast(check_counts, State) ->
- {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}.
+handle_info(check_counts, State) ->
+ {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #fhc_state { elders = Elders,
@@ -1133,9 +1133,9 @@ reduce(State = #fhc_state { open_pending = OpenPending,
end
end,
case TRef of
- undefined -> {ok, TRef1} = timer:apply_after(
- ?FILE_HANDLES_CHECK_INTERVAL,
- gen_server, cast, [?SERVER, check_counts]),
+ undefined -> TRef1 = erlang:send_after(
+ ?FILE_HANDLES_CHECK_INTERVAL, ?SERVER,
+ check_counts),
State #fhc_state { timer_ref = TRef1 };
_ -> State
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e9d01d12..6024db65 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,8 +33,7 @@
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
- emit_stats/1]).
+ set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -100,7 +99,6 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
--spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
@@ -405,9 +403,6 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) ->
delegate_call(QPid, stat).
-emit_stats(#amqqueue{pid = QPid}) ->
- delegate_cast(QPid, emit_stats).
-
delete_immediately(#amqqueue{ pid = QPid }) ->
gen_server2:cast(QPid, delete_immediately).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c6019413..e787fa84 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -262,10 +262,8 @@ stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
State#q{sync_timer_ref = undefined}.
ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL,
- rabbit_amqqueue, update_ram_duration,
- [self()]),
+ TRef = erlang:send_after(
+ ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
State#q{rate_timer_ref = TRef};
ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
State#q{rate_timer_ref = undefined};
@@ -277,13 +275,13 @@ stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
State#q{rate_timer_ref = undefined};
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ _ = erlang:cancel_timer(TRef),
State#q{rate_timer_ref = undefined}.
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ _ = erlang:cancel_timer(TRef),
State#q{expiry_timer_ref = undefined}.
%% We wish to expire only when there are no consumers *and* the expiry
@@ -295,18 +293,16 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
case is_unused(State) of
true ->
NewState = stop_expiry_timer(State),
- {ok, TRef} = timer:apply_after(
- Expires, rabbit_amqqueue, maybe_expire, [self()]),
+ TRef = erlang:send_after(Expires, self(), maybe_expire),
NewState#q{expiry_timer_ref = TRef};
false ->
State
end.
-ensure_stats_timer(State = #q{stats_timer = StatsTimer,
- q = Q}) ->
- State#q{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer,
- fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
+ensure_stats_timer(State = #q { stats_timer = StatsTimer,
+ q = #amqqueue { pid = QPid }}) ->
+ State #q { stats_timer = rabbit_event:ensure_stats_timer(
+ StatsTimer, QPid, emit_stats) }.
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -700,8 +696,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
when TTL =/= undefined ->
case BQ:is_empty(BQS) of
true -> State;
- false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired,
- [self()]),
+ false -> TRef = erlang:send_after(TTL, self(), drop_expired),
State#q{ttl_timer_ref = TRef}
end;
ensure_ttl_timer(State) ->
@@ -810,7 +805,14 @@ prioritise_cast(Msg, _State) ->
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
#q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8;
-prioritise_info(_Msg, _State) -> 0.
+prioritise_info(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ _ -> 0
+ end.
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -1085,15 +1087,6 @@ handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
noreply(State);
-handle_cast(update_ram_duration, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State#q{rate_timer_ref = just_measured,
- backing_queue_state = BQS2});
-
handle_cast({set_ram_duration_target, Duration},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
@@ -1101,24 +1094,33 @@ handle_cast({set_ram_duration_target, Duration},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
+ noreply(State).
+
+handle_info(update_ram_duration, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State#q{rate_timer_ref = just_measured,
+ backing_queue_state = BQS2});
-handle_cast(maybe_expire, State) ->
+handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
{stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
-handle_cast(drop_expired, State) ->
+handle_info(drop_expired, State) ->
noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
-handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) ->
%% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
assert_invariant(State1),
- {noreply, State1, hibernate}.
+ {noreply, State1, hibernate};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 13fb7ce1..6fbbc93e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -295,11 +295,6 @@ handle_cast({deliver, ConsumerTag, AckRequired,
rabbit_trace:tap_trace_out(Msg, TraceState),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
- internal_emit_stats(State),
- noreply([ensure_stats_timer],
- State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
-
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
@@ -307,6 +302,12 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
+handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
+ internal_emit_stats(State),
+ noreply([ensure_stats_timer],
+ State#ch{
+ stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
+
handle_info({'DOWN', MRef, process, QPid, Reason},
State = #ch{consumer_monitors = ConsumerMonitors}) ->
noreply(
@@ -370,8 +371,7 @@ next_state(Mask, State) ->
ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer,
- fun() -> emit_stats(ChPid) end)}.
+ StatsTimer, ChPid, emit_stats)}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 468f9293..887e4a1f 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -19,7 +19,7 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
+-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/1]).
-export([reset_stats_timer/1]).
-export([stats_level/1, if_enabled/2]).
-export([notify/2, notify_if/3]).
@@ -57,7 +57,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/0 :: () -> state()).
--spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(ensure_stats_timer/3 :: (state(), pid(), term()) -> state()).
-spec(stop_stats_timer/1 :: (state()) -> state()).
-spec(reset_stats_timer/1 :: (state()) -> state()).
-spec(stats_level/1 :: (state()) -> level()).
@@ -80,7 +80,7 @@ start_link() ->
%% if_enabled(internal_emit_stats) - so we immediately send something
%%
%% On wakeup:
-%% ensure_stats_timer(Timer, emit_stats)
+%% ensure_stats_timer(Timer, Pid, emit_stats)
%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.)
%%
%% emit_stats:
@@ -99,13 +99,13 @@ init_stats_timer() ->
{ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
#state{level = StatsLevel, interval = Interval, timer = undefined}.
-ensure_stats_timer(State = #state{level = none}, _Fun) ->
+ensure_stats_timer(State = #state{level = none}, _Pid, _Msg) ->
State;
ensure_stats_timer(State = #state{interval = Interval,
- timer = undefined}, Fun) ->
- {ok, TRef} = timer:apply_after(Interval, erlang, apply, [Fun, []]),
+ timer = undefined}, Pid, Msg) ->
+ TRef = erlang:send_after(Interval, Pid, Msg),
State#state{timer = TRef};
-ensure_stats_timer(State, _Fun) ->
+ensure_stats_timer(State, _Pid, _Msg) ->
State.
stop_stats_timer(State = #state{level = none}) ->
@@ -113,7 +113,7 @@ stop_stats_timer(State = #state{level = none}) ->
stop_stats_timer(State = #state{timer = undefined}) ->
State;
stop_stats_timer(State = #state{timer = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ _ = erlang:cancel_timer(TRef),
State#state{timer = undefined}.
reset_stats_timer(State) ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 27de1f77..6c5035a0 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -23,14 +23,14 @@
client_ref/1, close_all_indicated/1,
write/3, read/2, contains/2, remove/2, sync/3]).
--export([sync/1, set_maximum_since_use/2,
- has_readers/2, combine_files/3, delete_file/2]). %% internal
+-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
+ delete_file/2]). %% internal
-export([transform_dir/3, force_recovery/2]). %% upgrade
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2,
- format_message_queue/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, prioritise_call/3, prioritise_cast/2,
+ prioritise_info/2, format_message_queue/2]).
%%----------------------------------------------------------------------------
@@ -154,7 +154,6 @@
-spec(sync/3 ::
([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
--spec(sync/1 :: (server()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
@@ -444,9 +443,6 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
server_cast(CState, {remove, CRef, MsgIds}).
sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
-sync(Server) ->
- gen_server2:cast(Server, sync).
-
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
@@ -683,7 +679,6 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- sync -> 8;
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
{set_maximum_since_use, _Age} -> 8;
@@ -691,6 +686,12 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+prioritise_info(Msg, _State) ->
+ case Msg of
+ sync -> 8;
+ _ -> 0
+ end.
+
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
@@ -774,9 +775,6 @@ handle_cast({sync, MsgIds, K},
true -> noreply(State #msstate { on_sync = [K | Syncs] })
end;
-handle_cast(sync, State) ->
- noreply(internal_sync(State));
-
handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
file_handles_ets = FileHandlesEts,
@@ -800,6 +798,9 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
+handle_info(sync, State) ->
+ noreply(internal_sync(State));
+
handle_info(timeout, State) ->
noreply(internal_sync(State));
@@ -866,13 +867,13 @@ next_state(State = #msstate { on_sync = Syncs,
end.
start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]),
+ TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync),
State #msstate { sync_timer_ref = TRef }.
stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
State;
stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
+ _ = erlang:cancel_timer(TRef),
State #msstate { sync_timer_ref = undefined }.
internal_sync(State = #msstate { current_file_handle = CurHdl,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index dffabf85..3bc0e389 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -28,8 +28,6 @@
-export([process_channel_frame/5]). %% used by erlang-client
--export([emit_stats/1]).
-
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
@@ -70,7 +68,6 @@
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
--spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
@@ -126,9 +123,6 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
end.
-emit_stats(Pid) ->
- gen_server:cast(Pid, emit_stats).
-
conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
@@ -323,7 +317,7 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
catch Error -> {error, Error}
end),
mainloop(Deb, State);
-handle_other({'$gen_cast', emit_stats}, Deb, State) ->
+handle_other(emit_stats, Deb, State) ->
mainloop(Deb, internal_emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
@@ -591,10 +585,8 @@ refuse_connection(Sock, Exception) ->
ensure_stats_timer(State = #v1{stats_timer = StatsTimer,
connection_state = running}) ->
- Self = self(),
State#v1{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer,
- fun() -> emit_stats(Self) end)};
+ StatsTimer, self(), emit_stats)};
ensure_stats_timer(State) ->
State.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index ec1ee9cd..405949ef 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -76,7 +76,6 @@
%% Internal exports
-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]).
-export([handle_cast/2]).
--export([delayed_restart/2]).
-define(DICT, dict).
@@ -157,9 +156,6 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
end;
check_childspecs(X) -> {error, {badarg, X}}.
-delayed_restart(Supervisor, RestartDetails) ->
- gen_server:cast(Supervisor, {delayed_restart, RestartDetails}).
-
%%% ---------------------------------------------------
%%%
%%% Initialize the supervisor.
@@ -355,12 +351,19 @@ handle_call(which_children, _From, State) ->
State#state.children),
{reply, Resp, State}.
+%%% Hopefully cause a function-clause as there is no API function
+%%% that utilizes cast.
+handle_cast(null, State) ->
+ error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
+ []),
+
+ {noreply, State}.
-handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
+handle_info({delayed_restart, {RestartType, Reason, Child}}, State)
when ?is_simple(State) ->
{ok, NState} = do_restart(RestartType, Reason, Child, State),
{noreply, NState};
-handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) ->
+handle_info({delayed_restart, {RestartType, Reason, Child}}, State) ->
case get_child(Child#child.name, State) of
{value, Child1} ->
{ok, NState} = do_restart(RestartType, Reason, Child1, State),
@@ -369,14 +372,6 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) ->
{noreply, State}
end;
-%%% Hopefully cause a function-clause as there is no API function
-%%% that utilizes cast.
-handle_cast(null, State) ->
- error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
- []),
-
- {noreply, State}.
-
%%
%% Take care of terminated children.
%%
@@ -539,9 +534,9 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
{ok, NState} ->
{ok, NState};
{terminate, NState} ->
- {ok, _TRef} = timer:apply_after(
- trunc(Delay*1000), ?MODULE, delayed_restart,
- [self(), {{RestartType, Delay}, Reason, Child}]),
+ _TRef = erlang:send_after(trunc(Delay*1000), self(),
+ {delayed_restart,
+ {{RestartType, Delay}, Reason, Child}}),
{ok, state_del_child(Child, NState)}
end;
do_restart(permanent, Reason, Child, State) ->