diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 11:24:35 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 11:24:35 +0100 |
commit | 7630d9cca83bac5cfa4605077b83ee0d7670ff50 (patch) | |
tree | f021e622d77198f3e3936628ea15140d22c193ce | |
parent | 738fbf41a74347f26986f09eaeea7d71e5d546aa (diff) | |
parent | f2e188bbc4a9d8a3cc147275427adb284d5f0505 (diff) | |
download | rabbitmq-server-7630d9cca83bac5cfa4605077b83ee0d7670ff50.tar.gz |
merge bug23245 into default
-rw-r--r-- | Makefile | 16 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/control | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 49 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 24 | ||||
-rw-r--r-- | src/rabbit_dialyzer.erl | 25 | ||||
-rw-r--r-- | src/rabbit_event.erl | 71 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 43 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 25 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 17 |
11 files changed, 180 insertions, 105 deletions
@@ -92,12 +92,13 @@ endif all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) + rm -f $@ escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) $@ < $< -$(EBIN_DIR)/%.beam: +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl | $(DEPS_FILE) erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) @@ -111,7 +112,11 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ - "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))." + "rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\")." \ + -eval \ + "init:stop()." + + # rabbit.plt is used by rabbitmq-erlang-client's dialyze make target create-plt: $(RABBIT_PLT) @@ -308,11 +313,6 @@ else TESTABLEGOALS:=$(MAKECMDGOALS) endif -ifneq "$(strip $(TESTABLEGOALS))" "$(DEPS_FILE)" ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" -ifeq "$(strip $(wildcard $(DEPS_FILE)))" "" -$(info $(shell $(MAKE) $(DEPS_FILE))) -endif -include $(DEPS_FILE) -endif +-include $(DEPS_FILE) endif diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 4afc66ac..02da0cc6 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -7,7 +7,10 @@ Standards-Version: 3.8.0 Package: rabbitmq-server Architecture: all -Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} +# erlang-inets is not a strict dependency, but it's needed to allow +# the installation of plugins that use mochiweb. Ideally it would be a +# "Recommends" instead, but gdebi does not install those. +Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), erlang-inets | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} Description: An AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3e677c38..42bddc5e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, stop/0, declare/5, delete/3, purge/1]). +-export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, @@ -115,6 +115,9 @@ (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). -spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_exclusive/1 :: (rabbit_types:amqqueue()) + -> rabbit_types:ok_or_error2(qlen(), + 'not_exclusive')). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -359,6 +362,9 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). +delete_exclusive(#amqqueue{ pid = QPid }) -> + gen_server2:call(QPid, delete_exclusive, infinity). + delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 91877efb..2c53a8e3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -43,7 +43,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2]). + prioritise_cast/2, prioritise_info/2]). -import(queue). -import(erlang). @@ -153,7 +153,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined}) -> + backing_queue = BQ, backing_queue_state = undefined, + stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; Q -> gen_server2:reply(From, {new, Q}), @@ -164,9 +165,12 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), + State1 = init_expires(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State)), - noreply(init_expires(State#q{backing_queue_state = BQS})); + infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(StatsTimer, + fun() -> emit_stats(State1) end), + noreply(State1); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -266,14 +270,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer, q = Q}) -> State#q{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> emit_stats(State) end, fun() -> rabbit_amqqueue:emit_stats(Q) end)}. -stop_stats_timer(State = #q{stats_timer = StatsTimer}) -> - State#q{stats_timer = rabbit_event:stop_stats_timer( - StatsTimer, - fun() -> emit_stats(State) end)}. - assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -595,6 +593,7 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; + delete_exclusive -> 8; {maybe_run_queue_via_backing_queue, _Fun} -> 6; _ -> 0 end. @@ -613,6 +612,10 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, + #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8; +prioritise_info(_Msg, _State) -> 0. + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -782,6 +785,16 @@ handle_call(stat, _From, State = #q{backing_queue = BQ, active_consumers = ActiveConsumers}) -> reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State); +handle_call(delete_exclusive, _From, + State = #q{ backing_queue_state = BQS, + backing_queue = BQ, + q = #amqqueue{exclusive_owner = Owner} + }) when Owner =/= none -> + {stop, normal, {ok, BQ:len(BQS)}, State}; + +handle_call(delete_exclusive, _From, State) -> + reply({error, not_exclusive}, State); + handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), @@ -910,9 +923,12 @@ handle_cast(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; -handle_cast(emit_stats, State) -> +handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> + %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), - noreply(State). + State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, + assert_invariant(State1), + {noreply, State1}. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -943,11 +959,14 @@ handle_info(Info, State) -> handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + stats_timer = StatsTimer}) -> BQS1 = BQ:handle_pre_hibernate(BQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_stats_timer( - stop_rate_timer(State#q{backing_queue_state = BQS2}))}. + rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end), + State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), + backing_queue_state = BQS2}, + {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f19f98d2..bde11f00 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -158,6 +158,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), + StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -175,8 +176,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = rabbit_event:init_stats_timer()}, + stats_timer = StatsTimer}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), + rabbit_event:if_enabled(StatsTimer, + fun() -> internal_emit_stats(State) end), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -251,17 +254,22 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, end, State), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast(emit_stats, State) -> +handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), - {noreply, State}. + {noreply, + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. -handle_pre_hibernate(State) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - {hibernate, stop_stats_timer(State)}. + rabbit_event:if_enabled(StatsTimer, fun () -> + internal_emit_stats(State) + end), + {hibernate, + State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -291,14 +299,8 @@ ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), State#ch{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> internal_emit_stats(State) end, fun() -> emit_stats(ChPid) end)}. -stop_stats_timer(State = #ch{stats_timer = StatsTimer}) -> - State#ch{stats_timer = rabbit_event:stop_stats_timer( - StatsTimer, - fun() -> internal_emit_stats(State) end)}. - return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index 51bd6b1f..a9806305 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -61,26 +61,27 @@ add_to_plt(PltPath, FilesString) -> {init_plt, PltPath}, {output_plt, PltPath}, {files, Files}]), - print_warnings(DialyzerWarnings), + print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1), ok. dialyze_files(PltPath, ModifiedFiles) -> Files = string:tokens(ModifiedFiles, " "), DialyzerWarnings = dialyzer:run([{init_plt, PltPath}, - {files, Files}]), + {files, Files}, + {warnings, [behaviours, + race_conditions]}]), case DialyzerWarnings of - [] -> io:format("~nOk~n"), - ok; - _ -> io:format("~nFAILED with the following warnings:~n"), - print_warnings(DialyzerWarnings), - fail - end. - -print_warnings(Warnings) -> - [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings], - io:format("~n"), + [] -> io:format("~nOk~n"); + _ -> io:format("~n~nFAILED with the following ~p warnings:~n~n", + [length(DialyzerWarnings)]), + print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1) + end, ok. +print_warnings(Warnings, FormatFun) -> + [io:format("~s~n", [FormatFun(W)]) || W <- Warnings], + io:format("~n"). + otp_apps_dependencies_paths() -> [code:lib_dir(App, ebin) || App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]]. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 0f00537a..2b236531 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -34,9 +34,9 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]). --export([ensure_stats_timer_after/2, reset_stats_timer_after/1]). --export([stats_level/1]). +-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). +-export([reset_stats_timer/1]). +-export([stats_level/1, if_enabled/2]). -export([notify/2]). %%---------------------------------------------------------------------------- @@ -71,11 +71,11 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/0 :: () -> state()). --spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()). --spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()). --spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()). --spec(reset_stats_timer_after/1 :: (state()) -> state()). +-spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()). +-spec(stop_stats_timer/1 :: (state()) -> state()). +-spec(reset_stats_timer/1 :: (state()) -> state()). -spec(stats_level/1 :: (state()) -> level()). +-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). -endif. @@ -85,44 +85,61 @@ start_link() -> gen_event:start_link({local, ?MODULE}). +%% The idea is, for each stat-emitting object: +%% +%% On startup: +%% Timer = init_stats_timer() +%% notify(created event) +%% if_enabled(internal_emit_stats) - so we immediately send something +%% +%% On wakeup: +%% ensure_stats_timer(Timer, emit_stats) +%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.) +%% +%% emit_stats: +%% if_enabled(internal_emit_stats) +%% reset_stats_timer(Timer) - just bookkeeping +%% +%% Pre-hibernation: +%% if_enabled(internal_emit_stats) +%% stop_stats_timer(Timer) +%% +%% internal_emit_stats: +%% notify(stats) + init_stats_timer() -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), #state{level = StatsLevel, timer = undefined}. -ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) -> +ensure_stats_timer(State = #state{level = none}, _Fun) -> State; -ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) -> - NowFun(), - {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, - erlang, apply, [TimerFun, []]), +ensure_stats_timer(State = #state{timer = undefined}, Fun) -> + {ok, TRef} = timer:apply_after(?STATS_INTERVAL, + erlang, apply, [Fun, []]), State#state{timer = TRef}; -ensure_stats_timer(State, _NowFun, _TimerFun) -> +ensure_stats_timer(State, _Fun) -> State. -stop_stats_timer(State = #state{level = none}, _NowFun) -> +stop_stats_timer(State = #state{level = none}) -> State; -stop_stats_timer(State = #state{timer = undefined}, _NowFun) -> +stop_stats_timer(State = #state{timer = undefined}) -> State; -stop_stats_timer(State = #state{timer = TRef}, NowFun) -> +stop_stats_timer(State = #state{timer = TRef}) -> {ok, cancel} = timer:cancel(TRef), - NowFun(), State#state{timer = undefined}. -ensure_stats_timer_after(State = #state{level = none}, _TimerFun) -> - State; -ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) -> - {ok, TRef} = timer:apply_after(?STATS_INTERVAL, - erlang, apply, [TimerFun, []]), - State#state{timer = TRef}; -ensure_stats_timer_after(State, _TimerFun) -> - State. - -reset_stats_timer_after(State) -> +reset_stats_timer(State) -> State#state{timer = undefined}. stats_level(#state{level = Level}) -> Level. +if_enabled(#state{level = none}, _Fun) -> + ok; +if_enabled(_State, Fun) -> + Fun(), + ok. + notify(Type, Props) -> try %% TODO: switch to os:timestamp() when we drop support for diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 0a49b94d..0b8efc8f 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -81,7 +81,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> fun () -> ok end, fun () -> erlang:demonitor(MonitorRef), - rabbit_amqqueue:delete(Q, false, false) + rabbit_amqqueue:delete_exclusive(Q) end) || {MonitorRef, Q} <- dict:to_list(Queues)], {reply, ok, State}. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d6b8bb28..0b98290c 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -159,7 +159,9 @@ -define(PUB, {_, _}). %% {Guid, IsPersistent} --define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]). +-define(READ_MODE, [binary, raw, read]). +-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). +-define(WRITE_MODE, [write | ?READ_MODE]). %%---------------------------------------------------------------------------- @@ -220,8 +222,13 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) -> - State = #qistate { dir = Dir } = blank_state(Name, not Recover), +init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) -> + State = #qistate { dir = Dir } = blank_state(Name), + false = filelib:is_file(Dir), %% is_file == is file or dir + {0, [], State}; + +init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> + State = #qistate { dir = Dir } = blank_state(Name), Terms = case read_shutdown_terms(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 @@ -356,15 +363,8 @@ recover(DurableQueues) -> %% startup and shutdown %%---------------------------------------------------------------------------- -blank_state(QueueName, EnsureFresh) -> - StrName = queue_name_to_dir_name(QueueName), - Dir = filename:join(queues_dir(), StrName), - ok = case EnsureFresh of - true -> false = filelib:is_file(Dir), %% is_file == is file or dir - ok; - false -> ok - end, - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), +blank_state(QueueName) -> + Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)), {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, @@ -373,17 +373,21 @@ blank_state(QueueName, EnsureFresh) -> dirty_count = 0, max_journal_entries = MaxJournal }. +clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). + detect_clean_shutdown(Dir) -> - case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of + case file:delete(clean_file_name(Dir)) of ok -> true; {error, enoent} -> false end. read_shutdown_terms(Dir) -> - rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)). + rabbit_misc:read_term_file(clean_file_name(Dir)). store_clean_shutdown(Terms, Dir) -> - rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). + CleanFileName = clean_file_name(Dir), + ok = filelib:ensure_dir(CleanFileName), + rabbit_misc:write_term_file(CleanFileName, Terms). init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) @@ -500,7 +504,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName, false)), + recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> gatherer:in(Gatherer, {Guid, 1}); @@ -578,7 +582,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], + _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), ok = file_handle_cache:close(Hdl), @@ -588,7 +592,8 @@ append_journal_to_segment(#segment { journal_entries = JEntries, get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), - {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], + ok = filelib:ensure_dir(Path), + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; get_journal_handle(State = #qistate { journal_handle = Hdl }) -> @@ -785,7 +790,7 @@ segment_entries_foldr(Fun, Init, load_segment(KeepAcked, #segment { path = Path }) -> case filelib:is_file(Path) of false -> {array_new(), 0}; - true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), + true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), ok = file_handle_cache:close(Hdl), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 252f81a3..745e0083 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -369,10 +369,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> end), mainloop(Deb, State); {'$gen_cast', emit_stats} -> - internal_emit_stats(State), - mainloop(Deb, State#v1{stats_timer = - rabbit_event:reset_stats_timer_after( - State#v1.stats_timer)}); + State1 = internal_emit_stats(State), + mainloop(Deb, State1); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -690,11 +688,14 @@ refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), throw(Exception). -ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) -> +ensure_stats_timer(State = #v1{stats_timer = StatsTimer, + connection_state = running}) -> Self = self(), - State#v1{stats_timer = rabbit_event:ensure_stats_timer_after( + State#v1{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> emit_stats(Self) end)}. + fun() -> emit_stats(Self) end)}; +ensure_stats_timer(State) -> + State. %%-------------------------------------------------------------------------- @@ -765,7 +766,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - sock = Sock}) -> + sock = Sock, + stats_timer = StatsTimer}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), @@ -775,6 +777,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = NewConnection}), rabbit_event:notify(connection_created, infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(StatsTimer, + fun() -> internal_emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -938,5 +942,6 @@ amqp_exception_explanation(Text, Expl) -> true -> CompleteTextBin end. -internal_emit_stats(State) -> - rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)). +internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> + rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), + State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index e658f005..067fa9f2 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -296,6 +296,12 @@ get_total_memory({unix, sunos}) -> Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)), dict:fetch('Memory size', Dict); +get_total_memory({unix, aix}) -> + File = cmd("/usr/bin/vmstat -v"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line_aix/1, Lines)), + dict:fetch('memory pages', Dict) * 4096; + get_total_memory(_OsType) -> unknown. @@ -341,6 +347,17 @@ parse_line_sunos(Line) -> [Name] -> {list_to_atom(Name), none} end. +%% Lines look like " 12345 memory pages" +%% or " 80.1 maxpin percentage" +parse_line_aix(Line) -> + [Value | NameWords] = string:tokens(Line, " "), + Name = string:join(NameWords, " "), + {list_to_atom(Name), + case lists:member($., Value) of + true -> trunc(list_to_float(Value)); + false -> list_to_integer(Value) + end}. + freebsd_sysctl(Def) -> list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n"). |