diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-08-03 12:30:31 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-08-03 12:30:31 +0100 |
commit | 3383b7ccaa079ebeca4b7ab701abed7d7324ca6e (patch) | |
tree | d1e531f6a78b2ab4fce7955e765ce292ad4de8a7 | |
parent | e2c57c78fcc0281eeb78dd1914287e539265244c (diff) | |
parent | 10f12a2b0d464dc6642ebd04eebb411468eba2c1 (diff) | |
download | rabbitmq-server-3383b7ccaa079ebeca4b7ab701abed7d7324ca6e.tar.gz |
Merge bug23504.
-rw-r--r-- | src/file_handle_cache.erl | 12 | ||||
-rw-r--r-- | src/gm.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 85 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 43 | ||||
-rw-r--r-- | src/rabbit_event.erl | 16 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 32 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 31 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 18 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
-rw-r--r-- | src/supervisor2.erl | 29 |
11 files changed, 135 insertions, 179 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 61b08d49..235e14c0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -925,10 +925,10 @@ handle_cast({transfer, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), {noreply, process_pending( update_counts(obtain, ToPid, +1, - update_counts(obtain, FromPid, -1, State)))}; + update_counts(obtain, FromPid, -1, State)))}. -handle_cast(check_counts, State) -> - {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}. +handle_info(check_counts, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #fhc_state { elders = Elders, @@ -1133,9 +1133,9 @@ reduce(State = #fhc_state { open_pending = OpenPending, end end, case TRef of - undefined -> {ok, TRef1} = timer:apply_after( - ?FILE_HANDLES_CHECK_INTERVAL, - gen_server, cast, [?SERVER, check_counts]), + undefined -> TRef1 = erlang:send_after( + ?FILE_HANDLES_CHECK_INTERVAL, ?SERVER, + check_counts), State #fhc_state { timer_ref = TRef1 }; _ -> State end. @@ -376,11 +376,11 @@ confirmed_broadcast/2, group_members/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_cast/2, prioritise_info/2]). + code_change/3, prioritise_info/2]). -export([behaviour_info/1]). --export([table_definitions/0, flush/1]). +-export([table_definitions/0]). -define(GROUP_TABLE, gm_group). -define(HIBERNATE_AFTER_MIN, 1000). @@ -511,9 +511,6 @@ confirmed_broadcast(Server, Msg) -> group_members(Server) -> gen_server2:call(Server, group_members, infinity). -flush(Server) -> - gen_server2:cast(Server, flush). - init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), @@ -629,12 +626,12 @@ handle_cast(join, State = #state { self = Self, {Module:joined(Args, all_known_members(View)), State1}); handle_cast(leave, State) -> - {stop, normal, State}; + {stop, normal, State}. -handle_cast(flush, State) -> - noreply( - flush_broadcast_buffer(State #state { broadcast_timer = undefined })). +handle_info(flush, State) -> + noreply( + flush_broadcast_buffer(State #state { broadcast_timer = undefined })); handle_info({'DOWN', MRef, process, _Pid, _Reason}, State = #state { self = Self, @@ -684,9 +681,7 @@ terminate(Reason, State = #state { module = Module, code_change(_OldVsn, State, _Extra) -> {ok, State}. -prioritise_cast(flush, _State) -> 1; -prioritise_cast(_ , _State) -> 0. - +prioritise_info(flush, _State) -> 1; prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; prioritise_info(_ , _State) -> 0. @@ -808,10 +803,10 @@ ensure_broadcast_timer(State = #state { broadcast_buffer = [], State; ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = TRef }) -> - timer:cancel(TRef), + erlang:cancel_timer(TRef), State #state { broadcast_timer = undefined }; ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> - {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]), + TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush), State #state { broadcast_timer = TRef }; ensure_broadcast_timer(State) -> State. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e9d01d12..eae6312b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,9 +32,7 @@ %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, - sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1, drop_expired/1, - emit_stats/1]). + set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -100,7 +98,6 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). --spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') @@ -142,11 +139,8 @@ -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(sync_timeout/1 :: (pid()) -> 'ok'). --spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(maybe_expire/1 :: (pid()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). @@ -405,9 +399,6 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). -emit_stats(#amqqueue{pid = QPid}) -> - delegate_cast(QPid, emit_stats). - delete_immediately(#amqqueue{ pid = QPid }) -> gen_server2:cast(QPid, delete_immediately). @@ -486,24 +477,12 @@ internal_delete(QueueName) -> run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). -sync_timeout(QPid) -> - gen_server2:cast(QPid, sync_timeout). - -update_ram_duration(QPid) -> - gen_server2:cast(QPid, update_ram_duration). - set_ram_duration_target(QPid, Duration) -> gen_server2:cast(QPid, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -maybe_expire(QPid) -> - gen_server2:cast(QPid, maybe_expire). - -drop_expired(QPid) -> - gen_server2:cast(QPid, drop_expired). - on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c6019413..05de48d6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -249,8 +249,7 @@ backing_queue_module(#amqqueue{arguments = Args}) -> end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -258,14 +257,12 @@ ensure_sync_timer(State) -> stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> State; stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{sync_timer_ref = undefined}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), + TRef = erlang:send_after( + ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration), State#q{rate_timer_ref = TRef}; ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; @@ -277,13 +274,13 @@ stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{rate_timer_ref = undefined}. stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{expiry_timer_ref = undefined}. %% We wish to expire only when there are no consumers *and* the expiry @@ -295,18 +292,16 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> case is_unused(State) of true -> NewState = stop_expiry_timer(State), - {ok, TRef} = timer:apply_after( - Expires, rabbit_amqqueue, maybe_expire, [self()]), + TRef = erlang:send_after(Expires, self(), maybe_expire), NewState#q{expiry_timer_ref = TRef}; false -> State end. ensure_stats_timer(State = #q{stats_timer = StatsTimer, - q = Q}) -> + q = #amqqueue{pid = QPid}}) -> State#q{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> rabbit_amqqueue:emit_stats(Q) end)}. + StatsTimer, QPid, emit_stats)}. assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> @@ -700,8 +695,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, when TTL =/= undefined -> case BQ:is_empty(BQS) of true -> State; - false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired, - [self()]), + false -> TRef = erlang:send_after(TTL, self(), drop_expired), State#q{ttl_timer_ref = TRef} end; ensure_ttl_timer(State) -> @@ -792,25 +786,27 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; _ -> 0 end. -prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, - #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8; -prioritise_info(_Msg, _State) -> 0. +prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> + case Msg of + {'DOWN', _, process, DownPid, _} -> 8; + update_ram_duration -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + sync_timeout -> 6; + _ -> 0 + end. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -1015,9 +1011,6 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast(sync_timeout, State) -> - noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); - handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); @@ -1085,15 +1078,6 @@ handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), noreply(State); -handle_cast(update_ram_duration, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State#q{rate_timer_ref = just_measured, - backing_queue_state = BQS2}); - handle_cast({set_ram_duration_target, Duration}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), @@ -1101,24 +1085,24 @@ handle_cast({set_ram_duration_target, Duration}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); + noreply(State). -handle_cast(maybe_expire, State) -> +handle_info(maybe_expire, State) -> case is_unused(State) of true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) end; -handle_cast(drop_expired, State) -> +handle_info(drop_expired, State) -> noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); -handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> +handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, assert_invariant(State1), - {noreply, State1, hibernate}. + {noreply, State1, hibernate}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -1135,6 +1119,18 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; +handle_info(update_ram_duration, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State#q{rate_timer_ref = just_measured, + backing_queue_state = BQS2}); + +handle_info(sync_timeout, State) -> + noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); + handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); @@ -1155,10 +1151,9 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), BQS3 = BQ:handle_pre_hibernate(BQS2), - rabbit_event:if_enabled(StatsTimer, - fun () -> - emit_stats(State, [{idle_since, now()}]) - end), + rabbit_event:if_enabled( + StatsTimer, + fun () -> emit_stats(State, [{idle_since, now()}]) end), State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), backing_queue_state = BQS3}, {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 13fb7ce1..45f0032d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -23,11 +23,11 @@ -export([start_link/10, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([refresh_config_all/0, emit_stats/1, ready_for_close/1]). +-export([refresh_config_all/0, ready_for_close/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, format_message_queue/2]). + prioritise_cast/2, prioritise_info/2, format_message_queue/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter_pid, start_limiter_fun, tx_status, next_tag, @@ -91,7 +91,6 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(refresh_config_all/0 :: () -> 'ok'). --spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). -endif. @@ -153,9 +152,6 @@ refresh_config_all() -> fun (C) -> gen_server2:call(C, refresh_config) end, list()), ok. -emit_stats(Pid) -> - gen_server2:cast(Pid, emit_stats). - ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). @@ -196,7 +192,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, trace_state = rabbit_trace:init(VHost)}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, - fun() -> internal_emit_stats(State) end), + fun() -> emit_stats(State) end), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -209,11 +205,16 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - emit_stats -> 7; {confirm, _MsgSeqNos, _QPid} -> 5; _ -> 0 end. +prioritise_info(Msg, _State) -> + case Msg of + emit_stats -> 7; + _ -> 0 + end. + handle_call(flush, _From, State) -> reply(ok, State); @@ -295,11 +296,6 @@ handle_cast({deliver, ConsumerTag, AckRequired, rabbit_trace:tap_trace_out(Msg, TraceState), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> - internal_emit_stats(State), - noreply([ensure_stats_timer], - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); - handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). @@ -307,6 +303,11 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); +handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> + emit_stats(State), + noreply([ensure_stats_timer], + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); + handle_info({'DOWN', MRef, process, QPid, Reason}, State = #ch{consumer_monitors = ConsumerMonitors}) -> noreply( @@ -322,11 +323,8 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - rabbit_event:if_enabled(StatsTimer, - fun () -> - internal_emit_stats( - State, [{idle_since, now()}]) - end), + rabbit_event:if_enabled( + StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{stats_timer = StatsTimer1}}. @@ -370,8 +368,7 @@ next_state(Mask, State) -> ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), State#ch{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> emit_stats(ChPid) end)}. + StatsTimer, ChPid, emit_stats)}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -1495,10 +1492,10 @@ update_measures(Type, QX, Inc, Measure) -> put({Type, QX}, orddict:store(Measure, Cur + Inc, Measures)). -internal_emit_stats(State) -> - internal_emit_stats(State, []). +emit_stats(State) -> + emit_stats(State, []). -internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> +emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> CoarseStats = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(StatsTimer) of coarse -> diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 468f9293..bb765566 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -19,7 +19,7 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). +-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/1]). -export([reset_stats_timer/1]). -export([stats_level/1, if_enabled/2]). -export([notify/2, notify_if/3]). @@ -57,7 +57,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/0 :: () -> state()). --spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()). +-spec(ensure_stats_timer/3 :: (state(), pid(), term()) -> state()). -spec(stop_stats_timer/1 :: (state()) -> state()). -spec(reset_stats_timer/1 :: (state()) -> state()). -spec(stats_level/1 :: (state()) -> level()). @@ -80,7 +80,7 @@ start_link() -> %% if_enabled(internal_emit_stats) - so we immediately send something %% %% On wakeup: -%% ensure_stats_timer(Timer, emit_stats) +%% ensure_stats_timer(Timer, Pid, emit_stats) %% (Note we can't emit stats immediately, the timer may have fired 1ms ago.) %% %% emit_stats: @@ -99,13 +99,13 @@ init_stats_timer() -> {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), #state{level = StatsLevel, interval = Interval, timer = undefined}. -ensure_stats_timer(State = #state{level = none}, _Fun) -> +ensure_stats_timer(State = #state{level = none}, _Pid, _Msg) -> State; ensure_stats_timer(State = #state{interval = Interval, - timer = undefined}, Fun) -> - {ok, TRef} = timer:apply_after(Interval, erlang, apply, [Fun, []]), + timer = undefined}, Pid, Msg) -> + TRef = erlang:send_after(Interval, Pid, Msg), State#state{timer = TRef}; -ensure_stats_timer(State, _Fun) -> +ensure_stats_timer(State, _Pid, _Msg) -> State. stop_stats_timer(State = #state{level = none}) -> @@ -113,7 +113,7 @@ stop_stats_timer(State = #state{level = none}) -> stop_stats_timer(State = #state{timer = undefined}) -> State; stop_stats_timer(State = #state{timer = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#state{timer = undefined}. reset_stats_timer(State) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b38a8967..12b6f3ca 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -37,7 +37,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2]). + prioritise_cast/2, prioritise_info/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -187,9 +187,9 @@ handle_cast({set_ram_duration_target, Duration}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - noreply(State #state { backing_queue_state = BQS1 }); + noreply(State #state { backing_queue_state = BQS1 }). -handle_cast(update_ram_duration, +handle_info(update_ram_duration, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), @@ -199,9 +199,9 @@ handle_cast(update_ram_duration, noreply(State #state { rate_timer_ref = just_measured, backing_queue_state = BQS2 }); -handle_cast(sync_timeout, State) -> +handle_info(sync_timeout, State) -> noreply(backing_queue_timeout( - State #state { sync_timer_ref = undefined })). + State #state { sync_timer_ref = undefined })); handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); @@ -266,16 +266,21 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; {gm, _Msg} -> 5; {post_commit, _Txn, _AckTags} -> 4; _ -> 0 end. +prioritise_info(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + sync_timeout -> 6; + _ -> 0 + end. + %% --------------------------------------------------------------------------- %% GM %% --------------------------------------------------------------------------- @@ -516,8 +521,7 @@ backing_queue_timeout(State = #state { backing_queue = BQ }) -> run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), State #state { sync_timer_ref = TRef }; ensure_sync_timer(State) -> State. @@ -525,14 +529,12 @@ ensure_sync_timer(State) -> stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> State; stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State #state { sync_timer_ref = undefined }. ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), + TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), State #state { rate_timer_ref = TRef }; ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> State #state { rate_timer_ref = undefined }; @@ -544,7 +546,7 @@ stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> State #state { rate_timer_ref = undefined }; stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State #state { rate_timer_ref = undefined }. ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 27de1f77..e90e1281 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -23,14 +23,14 @@ client_ref/1, close_all_indicated/1, write/3, read/2, contains/2, remove/2, sync/3]). --export([sync/1, set_maximum_since_use/2, - has_readers/2, combine_files/3, delete_file/2]). %% internal +-export([set_maximum_since_use/2, has_readers/2, combine_files/3, + delete_file/2]). %% internal -export([transform_dir/3, force_recovery/2]). %% upgrade --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2, - format_message_queue/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, prioritise_call/3, prioritise_cast/2, + prioritise_info/2, format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -154,7 +154,6 @@ -spec(sync/3 :: ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). --spec(sync/1 :: (server()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> @@ -444,9 +443,6 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> server_cast(CState, {remove, CRef, MsgIds}). sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). -sync(Server) -> - gen_server2:cast(Server, sync). - set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -683,7 +679,6 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - sync -> 8; {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; {set_maximum_since_use, _Age} -> 8; @@ -691,6 +686,12 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +prioritise_info(Msg, _State) -> + case Msg of + sync -> 8; + _ -> 0 + end. + handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); @@ -774,9 +775,6 @@ handle_cast({sync, MsgIds, K}, true -> noreply(State #msstate { on_sync = [K | Syncs] }) end; -handle_cast(sync, State) -> - noreply(internal_sync(State)); - handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, @@ -800,6 +798,9 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). +handle_info(sync, State) -> + noreply(internal_sync(State)); + handle_info(timeout, State) -> noreply(internal_sync(State)); @@ -866,13 +867,13 @@ next_state(State = #msstate { on_sync = Syncs, end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync), State #msstate { sync_timer_ref = TRef }. stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> State; stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State #msstate { sync_timer_ref = undefined }. internal_sync(State = #msstate { current_file_handle = CurHdl, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index dffabf85..2dccc748 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -28,8 +28,6 @@ -export([process_channel_frame/5]). %% used by erlang-client --export([emit_stats/1]). - -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). @@ -70,7 +68,6 @@ -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). --spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -126,9 +123,6 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. -emit_stats(Pid) -> - gen_server:cast(Pid, emit_stats). - conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -323,8 +317,8 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> catch Error -> {error, Error} end), mainloop(Deb, State); -handle_other({'$gen_cast', emit_stats}, Deb, State) -> - mainloop(Deb, internal_emit_stats(State)); +handle_other(emit_stats, Deb, State) -> + mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other(Other, _Deb, _State) -> @@ -591,10 +585,8 @@ refuse_connection(Sock, Exception) -> ensure_stats_timer(State = #v1{stats_timer = StatsTimer, connection_state = running}) -> - Self = self(), State#v1{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> emit_stats(Self) end)}; + StatsTimer, self(), emit_stats)}; ensure_stats_timer(State) -> State. @@ -694,7 +686,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), rabbit_event:if_enabled(StatsTimer, - fun() -> internal_emit_stats(State1) end), + fun() -> emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), @@ -923,6 +915,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}}, State1#v1.sock, 0, CloseMethod, Protocol), State1. -internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> +emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2a3ced92..ed4efb47 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1287,7 +1287,7 @@ test_statistics_event_receiver(Pid) -> test_statistics_receive_event(Ch, Matcher) -> rabbit_channel:flush(Ch), - rabbit_channel:emit_stats(Ch), + Ch ! emit_stats, test_statistics_receive_event1(Ch, Matcher). test_statistics_receive_event1(Ch, Matcher) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index ec1ee9cd..405949ef 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -76,7 +76,6 @@ %% Internal exports -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). -export([handle_cast/2]). --export([delayed_restart/2]). -define(DICT, dict). @@ -157,9 +156,6 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. -delayed_restart(Supervisor, RestartDetails) -> - gen_server:cast(Supervisor, {delayed_restart, RestartDetails}). - %%% --------------------------------------------------- %%% %%% Initialize the supervisor. @@ -355,12 +351,19 @@ handle_call(which_children, _From, State) -> State#state.children), {reply, Resp, State}. +%%% Hopefully cause a function-clause as there is no API function +%%% that utilizes cast. +handle_cast(null, State) -> + error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", + []), + + {noreply, State}. -handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) when ?is_simple(State) -> {ok, NState} = do_restart(RestartType, Reason, Child, State), {noreply, NState}; -handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) -> case get_child(Child#child.name, State) of {value, Child1} -> {ok, NState} = do_restart(RestartType, Reason, Child1, State), @@ -369,14 +372,6 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> {noreply, State} end; -%%% Hopefully cause a function-clause as there is no API function -%%% that utilizes cast. -handle_cast(null, State) -> - error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", - []), - - {noreply, State}. - %% %% Take care of terminated children. %% @@ -539,9 +534,9 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> {ok, NState} -> {ok, NState}; {terminate, NState} -> - {ok, _TRef} = timer:apply_after( - trunc(Delay*1000), ?MODULE, delayed_restart, - [self(), {{RestartType, Delay}, Reason, Child}]), + _TRef = erlang:send_after(trunc(Delay*1000), self(), + {delayed_restart, + {{RestartType, Delay}, Reason, Child}}), {ok, state_del_child(Child, NState)} end; do_restart(permanent, Reason, Child, State) -> |