summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 11:24:35 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 11:24:35 +0100
commit7630d9cca83bac5cfa4605077b83ee0d7670ff50 (patch)
treef021e622d77198f3e3936628ea15140d22c193ce
parent738fbf41a74347f26986f09eaeea7d71e5d546aa (diff)
parentf2e188bbc4a9d8a3cc147275427adb284d5f0505 (diff)
downloadrabbitmq-server-7630d9cca83bac5cfa4605077b83ee0d7670ff50.tar.gz
merge bug23245 into default
-rw-r--r--Makefile16
-rw-r--r--packaging/debs/Debian/debian/control5
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl49
-rw-r--r--src/rabbit_channel.erl24
-rw-r--r--src/rabbit_dialyzer.erl25
-rw-r--r--src/rabbit_event.erl71
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl43
-rw-r--r--src/rabbit_reader.erl25
-rw-r--r--src/vm_memory_monitor.erl17
11 files changed, 180 insertions, 105 deletions
diff --git a/Makefile b/Makefile
index 46b18425..7c18cc7f 100644
--- a/Makefile
+++ b/Makefile
@@ -92,12 +92,13 @@ endif
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
+ rm -f $@
escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
escript generate_app $(EBIN_DIR) $@ < $<
-$(EBIN_DIR)/%.beam:
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl | $(DEPS_FILE)
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8)
@@ -111,7 +112,11 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
- "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))."
+ "rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\")." \
+ -eval \
+ "init:stop()."
+
+
# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target
create-plt: $(RABBIT_PLT)
@@ -308,11 +313,6 @@ else
TESTABLEGOALS:=$(MAKECMDGOALS)
endif
-ifneq "$(strip $(TESTABLEGOALS))" "$(DEPS_FILE)"
ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" ""
-ifeq "$(strip $(wildcard $(DEPS_FILE)))" ""
-$(info $(shell $(MAKE) $(DEPS_FILE)))
-endif
-include $(DEPS_FILE)
-endif
+-include $(DEPS_FILE)
endif
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 4afc66ac..02da0cc6 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -7,7 +7,10 @@ Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
+# erlang-inets is not a strict dependency, but it's needed to allow
+# the installation of plugins that use mochiweb. Ideally it would be a
+# "Recommends" instead, but gdebi does not install those.
+Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), erlang-inets | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3e677c38..42bddc5e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete/3, purge/1]).
+-export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -115,6 +115,9 @@
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_exclusive/1 :: (rabbit_types:amqqueue())
+ -> rabbit_types:ok_or_error2(qlen(),
+ 'not_exclusive')).
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -359,6 +362,9 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
+delete_exclusive(#amqqueue{ pid = QPid }) ->
+ gen_server2:call(QPid, delete_exclusive, infinity).
+
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 91877efb..2c53a8e3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -43,7 +43,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, prioritise_info/2]).
-import(queue).
-import(erlang).
@@ -153,7 +153,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined}) ->
+ backing_queue = BQ, backing_queue_state = undefined,
+ stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
Q -> gen_server2:reply(From, {new, Q}),
@@ -164,9 +165,12 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
+ State1 = init_expires(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
- infos(?CREATION_EVENT_KEYS, State)),
- noreply(init_expires(State#q{backing_queue_state = BQS}));
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> emit_stats(State1) end),
+ noreply(State1);
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -266,14 +270,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer,
q = Q}) ->
State#q{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> emit_stats(State) end,
fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
-stop_stats_timer(State = #q{stats_timer = StatsTimer}) ->
- State#q{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> emit_stats(State) end)}.
-
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -595,6 +593,7 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
+ delete_exclusive -> 8;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
_ -> 0
end.
@@ -613,6 +612,10 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8;
+prioritise_info(_Msg, _State) -> 0.
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -782,6 +785,16 @@ handle_call(stat, _From, State = #q{backing_queue = BQ,
active_consumers = ActiveConsumers}) ->
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+handle_call(delete_exclusive, _From,
+ State = #q{ backing_queue_state = BQS,
+ backing_queue = BQ,
+ q = #amqqueue{exclusive_owner = Owner}
+ }) when Owner =/= none ->
+ {stop, normal, {ok, BQ:len(BQS)}, State};
+
+handle_call(delete_exclusive, _From, State) ->
+ reply({error, not_exclusive}, State);
+
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
@@ -910,9 +923,12 @@ handle_cast(maybe_expire, State) ->
false -> noreply(ensure_expiry_timer(State))
end;
-handle_cast(emit_stats, State) ->
+handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+ %% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
- noreply(State).
+ State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
+ assert_invariant(State1),
+ {noreply, State1}.
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -943,11 +959,14 @@ handle_info(Info, State) ->
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
{hibernate, State};
handle_pre_hibernate(State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ stats_timer = StatsTimer}) ->
BQS1 = BQ:handle_pre_hibernate(BQS),
%% no activity for a while == 0 egress and ingress rates
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_stats_timer(
- stop_rate_timer(State#q{backing_queue_state = BQS2}))}.
+ rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end),
+ State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
+ backing_queue_state = BQS2},
+ {hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f19f98d2..bde11f00 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -158,6 +158,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
+ StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -175,8 +176,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
consumer_mapping = dict:new(),
blocking = dict:new(),
queue_collector_pid = CollectorPid,
- stats_timer = rabbit_event:init_stats_timer()},
+ stats_timer = StatsTimer},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> internal_emit_stats(State) end),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -251,17 +254,22 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast(emit_stats, State) ->
+handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
internal_emit_stats(State),
- {noreply, State}.
+ {noreply,
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}.
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State)}.
-handle_pre_hibernate(State) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- {hibernate, stop_stats_timer(State)}.
+ rabbit_event:if_enabled(StatsTimer, fun () ->
+ internal_emit_stats(State)
+ end),
+ {hibernate,
+ State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -291,14 +299,8 @@ ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> internal_emit_stats(State) end,
fun() -> emit_stats(ChPid) end)}.
-stop_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
- State#ch{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> internal_emit_stats(State) end)}.
-
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
index 51bd6b1f..a9806305 100644
--- a/src/rabbit_dialyzer.erl
+++ b/src/rabbit_dialyzer.erl
@@ -61,26 +61,27 @@ add_to_plt(PltPath, FilesString) ->
{init_plt, PltPath},
{output_plt, PltPath},
{files, Files}]),
- print_warnings(DialyzerWarnings),
+ print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1),
ok.
dialyze_files(PltPath, ModifiedFiles) ->
Files = string:tokens(ModifiedFiles, " "),
DialyzerWarnings = dialyzer:run([{init_plt, PltPath},
- {files, Files}]),
+ {files, Files},
+ {warnings, [behaviours,
+ race_conditions]}]),
case DialyzerWarnings of
- [] -> io:format("~nOk~n"),
- ok;
- _ -> io:format("~nFAILED with the following warnings:~n"),
- print_warnings(DialyzerWarnings),
- fail
- end.
-
-print_warnings(Warnings) ->
- [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings],
- io:format("~n"),
+ [] -> io:format("~nOk~n");
+ _ -> io:format("~n~nFAILED with the following ~p warnings:~n~n",
+ [length(DialyzerWarnings)]),
+ print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1)
+ end,
ok.
+print_warnings(Warnings, FormatFun) ->
+ [io:format("~s~n", [FormatFun(W)]) || W <- Warnings],
+ io:format("~n").
+
otp_apps_dependencies_paths() ->
[code:lib_dir(App, ebin) ||
App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]].
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 0f00537a..2b236531 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -34,9 +34,9 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
--export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
--export([stats_level/1]).
+-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
+-export([reset_stats_timer/1]).
+-export([stats_level/1, if_enabled/2]).
-export([notify/2]).
%%----------------------------------------------------------------------------
@@ -71,11 +71,11 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/0 :: () -> state()).
--spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
--spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
--spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()).
--spec(reset_stats_timer_after/1 :: (state()) -> state()).
+-spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(stop_stats_timer/1 :: (state()) -> state()).
+-spec(reset_stats_timer/1 :: (state()) -> state()).
-spec(stats_level/1 :: (state()) -> level()).
+-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-endif.
@@ -85,44 +85,61 @@
start_link() ->
gen_event:start_link({local, ?MODULE}).
+%% The idea is, for each stat-emitting object:
+%%
+%% On startup:
+%% Timer = init_stats_timer()
+%% notify(created event)
+%% if_enabled(internal_emit_stats) - so we immediately send something
+%%
+%% On wakeup:
+%% ensure_stats_timer(Timer, emit_stats)
+%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.)
+%%
+%% emit_stats:
+%% if_enabled(internal_emit_stats)
+%% reset_stats_timer(Timer) - just bookkeeping
+%%
+%% Pre-hibernation:
+%% if_enabled(internal_emit_stats)
+%% stop_stats_timer(Timer)
+%%
+%% internal_emit_stats:
+%% notify(stats)
+
init_stats_timer() ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
#state{level = StatsLevel, timer = undefined}.
-ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) ->
+ensure_stats_timer(State = #state{level = none}, _Fun) ->
State;
-ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) ->
- NowFun(),
- {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
+ensure_stats_timer(State = #state{timer = undefined}, Fun) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ erlang, apply, [Fun, []]),
State#state{timer = TRef};
-ensure_stats_timer(State, _NowFun, _TimerFun) ->
+ensure_stats_timer(State, _Fun) ->
State.
-stop_stats_timer(State = #state{level = none}, _NowFun) ->
+stop_stats_timer(State = #state{level = none}) ->
State;
-stop_stats_timer(State = #state{timer = undefined}, _NowFun) ->
+stop_stats_timer(State = #state{timer = undefined}) ->
State;
-stop_stats_timer(State = #state{timer = TRef}, NowFun) ->
+stop_stats_timer(State = #state{timer = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- NowFun(),
State#state{timer = undefined}.
-ensure_stats_timer_after(State = #state{level = none}, _TimerFun) ->
- State;
-ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) ->
- {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
- State#state{timer = TRef};
-ensure_stats_timer_after(State, _TimerFun) ->
- State.
-
-reset_stats_timer_after(State) ->
+reset_stats_timer(State) ->
State#state{timer = undefined}.
stats_level(#state{level = Level}) ->
Level.
+if_enabled(#state{level = none}, _Fun) ->
+ ok;
+if_enabled(_State, Fun) ->
+ Fun(),
+ ok.
+
notify(Type, Props) ->
try
%% TODO: switch to os:timestamp() when we drop support for
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 0a49b94d..0b8efc8f 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -81,7 +81,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
fun () -> ok end,
fun () ->
erlang:demonitor(MonitorRef),
- rabbit_amqqueue:delete(Q, false, false)
+ rabbit_amqqueue:delete_exclusive(Q)
end)
|| {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State}.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index d6b8bb28..0b98290c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -159,7 +159,9 @@
-define(PUB, {_, _}). %% {Guid, IsPersistent}
--define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]).
+-define(READ_MODE, [binary, raw, read]).
+-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
+-define(WRITE_MODE, [write | ?READ_MODE]).
%%----------------------------------------------------------------------------
@@ -220,8 +222,13 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) ->
- State = #qistate { dir = Dir } = blank_state(Name, not Recover),
+init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name),
+ false = filelib:is_file(Dir), %% is_file == is file or dir
+ {0, [], State};
+
+init(Name, true, MsgStoreRecovered, ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name),
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
@@ -356,15 +363,8 @@ recover(DurableQueues) ->
%% startup and shutdown
%%----------------------------------------------------------------------------
-blank_state(QueueName, EnsureFresh) ->
- StrName = queue_name_to_dir_name(QueueName),
- Dir = filename:join(queues_dir(), StrName),
- ok = case EnsureFresh of
- true -> false = filelib:is_file(Dir), %% is_file == is file or dir
- ok;
- false -> ok
- end,
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+blank_state(QueueName) ->
+ Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)),
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -373,17 +373,21 @@ blank_state(QueueName, EnsureFresh) ->
dirty_count = 0,
max_journal_entries = MaxJournal }.
+clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
+
detect_clean_shutdown(Dir) ->
- case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ case file:delete(clean_file_name(Dir)) of
ok -> true;
{error, enoent} -> false
end.
read_shutdown_terms(Dir) ->
- rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)).
+ rabbit_misc:read_term_file(clean_file_name(Dir)).
store_clean_shutdown(Terms, Dir) ->
- rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
+ CleanFileName = clean_file_name(Dir),
+ ok = filelib:ensure_dir(CleanFileName),
+ rabbit_misc:write_term_file(CleanFileName, Terms).
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -500,7 +504,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName, false)),
+ recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) ->
gatherer:in(Gatherer, {Guid, 1});
@@ -578,7 +582,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
+ _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
ok = file_handle_cache:close(Hdl),
@@ -588,7 +592,8 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
+ ok = filelib:ensure_dir(Path),
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
@@ -785,7 +790,7 @@ segment_entries_foldr(Fun, Init,
load_segment(KeepAcked, #segment { path = Path }) ->
case filelib:is_file(Path) of
false -> {array_new(), 0};
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
+ true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
ok = file_handle_cache:close(Hdl),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 252f81a3..745e0083 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -369,10 +369,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
end),
mainloop(Deb, State);
{'$gen_cast', emit_stats} ->
- internal_emit_stats(State),
- mainloop(Deb, State#v1{stats_timer =
- rabbit_event:reset_stats_timer_after(
- State#v1.stats_timer)});
+ State1 = internal_emit_stats(State),
+ mainloop(Deb, State1);
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -690,11 +688,14 @@ refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
throw(Exception).
-ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ensure_stats_timer(State = #v1{stats_timer = StatsTimer,
+ connection_state = running}) ->
Self = self(),
- State#v1{stats_timer = rabbit_event:ensure_stats_timer_after(
+ State#v1{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> emit_stats(Self) end)}.
+ fun() -> emit_stats(Self) end)};
+ensure_stats_timer(State) ->
+ State.
%%--------------------------------------------------------------------------
@@ -765,7 +766,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- sock = Sock}) ->
+ sock = Sock,
+ stats_timer = StatsTimer}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -775,6 +777,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = NewConnection}),
rabbit_event:notify(connection_created,
infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -938,5 +942,6 @@ amqp_exception_explanation(Text, Expl) ->
true -> CompleteTextBin
end.
-internal_emit_stats(State) ->
- rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)).
+internal_emit_stats(State = #v1{stats_timer = StatsTimer}) ->
+ rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
+ State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index e658f005..067fa9f2 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -296,6 +296,12 @@ get_total_memory({unix, sunos}) ->
Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)),
dict:fetch('Memory size', Dict);
+get_total_memory({unix, aix}) ->
+ File = cmd("/usr/bin/vmstat -v"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line_aix/1, Lines)),
+ dict:fetch('memory pages', Dict) * 4096;
+
get_total_memory(_OsType) ->
unknown.
@@ -341,6 +347,17 @@ parse_line_sunos(Line) ->
[Name] -> {list_to_atom(Name), none}
end.
+%% Lines look like " 12345 memory pages"
+%% or " 80.1 maxpin percentage"
+parse_line_aix(Line) ->
+ [Value | NameWords] = string:tokens(Line, " "),
+ Name = string:join(NameWords, " "),
+ {list_to_atom(Name),
+ case lists:member($., Value) of
+ true -> trunc(list_to_float(Value));
+ false -> list_to_integer(Value)
+ end}.
+
freebsd_sysctl(Def) ->
list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n").