summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-04-02 18:29:05 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-04-02 18:29:05 +0100
commit9e86579e3accb67bd9e604dccc074cbc8043f10d (patch)
tree41e723f52f9e08a6d6b39c595f1fd2b51d54fb82
parenta9eb53c6404c187c76836be76bbd33960942a20a (diff)
parent3d63f46b4f94c777a76e62fe951e4b2c96d0c24e (diff)
downloadrabbitmq-server-9e86579e3accb67bd9e604dccc074cbc8043f10d.tar.gz
merge default into bug24725
-rw-r--r--Makefile8
-rw-r--r--docs/rabbitmqctl.1.xml6
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--src/dtree.erl111
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_alarm.erl89
-rw-r--r--src/rabbit_amqqueue_process.erl119
-rw-r--r--src/rabbit_basic.erl6
-rw-r--r--src/rabbit_channel.erl102
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_disk_monitor.erl196
-rw-r--r--src/rabbit_misc.erl22
-rw-r--r--src/rabbit_reader.erl34
-rw-r--r--src/rabbit_sup.erl24
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/vm_memory_monitor.erl4
17 files changed, 493 insertions, 245 deletions
diff --git a/Makefile b/Makefile
index e8975856..d16cd4d0 100644
--- a/Makefile
+++ b/Makefile
@@ -216,12 +216,12 @@ start-rabbit-on-node: all
stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
-set-memory-alarm: all
- echo "alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []})." | \
+set-resource-alarm: all
+ echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \
$(ERL_CALL)
-clear-memory-alarm: all
- echo "alarm_handler:clear_alarm({vm_memory_high_watermark, node()})." | \
+clear-resource-alarm: all
+ echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \
$(ERL_CALL)
stop-node:
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index ee07ebfe..ded3ab48 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -948,7 +948,7 @@
<term>source_kind</term>
<listitem><para>The kind of the source of messages to
which the binding is attached. Currently always
- queue. With non-ASCII characters escaped as in
+ exchange. With non-ASCII characters escaped as in
C.</para></listitem>
</varlistentry>
<varlistentry>
@@ -1074,8 +1074,8 @@
<varlistentry>
<term>last_blocked_by</term>
<listitem><para>The reason for which this connection
- was last blocked. One of 'mem' - due to a memory
- alarm, 'flow' - due to internal flow control, or
+ was last blocked. One of 'resource' - due to a memory
+ or disk alarm, 'flow' - due to internal flow control, or
'none' if the connection was never
blocked.</para></listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index fd19051d..b7d14f20 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -19,6 +19,7 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
+ {disk_free_limit, {mem_relative, 1.0}},
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
{frame_max, 131072},
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index fb02cd6a..e935acf5 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,6 +2,8 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
+Uploader: Emile Joubert <emile@rabbitmq.com>
+DM-Upload-Allowed: yes
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
Standards-Version: 3.8.0
diff --git a/src/dtree.erl b/src/dtree.erl
new file mode 100644
index 00000000..265bb340
--- /dev/null
+++ b/src/dtree.erl
@@ -0,0 +1,111 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+%% A dual-index tree.
+%%
+%% Conceptually, what we want is a map that has two distinct sets of
+%% keys (referred to here as primary and secondary, although that
+%% shouldn't imply a hierarchy) pointing to one set of
+%% values. However, in practice what we'll always want to do is insert
+%% a value that's pointed at by (one primary, many secondaries) and
+%% remove values that are pointed at by (one secondary, many
+%% primaries) or (one secondary, all primaries). Thus the API.
+%%
+%% Entries exists while they have a non-empty secondary key set. The
+%% 'take' operations return the entries that got removed, i.e. that
+%% had no remaining secondary keys. take/3 expects entries to exist
+%% with the supplied primary keys and secondary key. take/2 can cope
+%% with the supplied secondary key having no entries.
+
+-module(dtree).
+
+-export([empty/0, insert/4, take/3, take/2,
+ is_defined/2, is_empty/1, smallest/1, size/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: {gb_tree(), gb_tree()}).
+
+-type(pk() :: any()).
+-type(sk() :: any()).
+-type(val() :: any()).
+-type(kv() :: {pk(), val()}).
+
+-spec(empty/0 :: () -> ?MODULE()).
+-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()).
+-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+-spec(smallest/1 :: (?MODULE()) -> kv()).
+-spec(size/1 :: (?MODULE()) -> non_neg_integer()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+empty() -> {gb_trees:empty(), gb_trees:empty()}.
+
+insert(PK, SKs, V, {P, S}) ->
+ {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P),
+ lists:foldl(fun (SK, S0) ->
+ case gb_trees:lookup(SK, S0) of
+ {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS),
+ gb_trees:update(SK, PKS1, S0);
+ none -> PKS = gb_sets:singleton(PK),
+ gb_trees:insert(SK, PKS, S0)
+ end
+ end, S, SKs)}.
+
+take(PKs, SK, {P, S}) ->
+ {KVs, P1} = take2(PKs, SK, P),
+ PKS = gb_sets:difference(gb_trees:get(SK, S), gb_sets:from_list(PKs)),
+ {KVs, {P1, case gb_sets:is_empty(PKS) of
+ true -> gb_trees:delete(SK, S);
+ false -> gb_trees:update(SK, PKS, S)
+ end}}.
+
+take(SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> {KVs, P1} = take2(gb_sets:to_list(PKS), SK, P),
+ {KVs, {P1, gb_trees:delete(SK, S)}}
+ end.
+
+is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
+
+is_empty({P, _S}) -> gb_trees:is_empty(P).
+
+smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P),
+ {K, V}.
+
+size({P, _S}) -> gb_trees:size(P).
+
+%%----------------------------------------------------------------------------
+
+take2(PKs, SK, P) ->
+ lists:foldl(fun (PK, {KVs, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ SKS1 = gb_sets:delete(SK, SKS),
+ case gb_sets:is_empty(SKS1) of
+ true -> {[{PK, V} | KVs], gb_trees:delete(PK, P0)};
+ false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
+ end
+ end, {[], P}, PKs).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index dd5fb89c..eec7e34e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -327,7 +327,11 @@ status() ->
[{vm_memory_high_watermark, {vm_memory_monitor,
get_vm_memory_high_watermark, []}},
{vm_memory_limit, {vm_memory_monitor,
- get_memory_limit, []}}]),
+ get_memory_limit, []}},
+ {disk_free_limit, {rabbit_disk_monitor,
+ get_disk_free_limit, []}},
+ {disk_free, {rabbit_disk_monitor,
+ get_disk_free, []}}]),
S3 = rabbit_misc:with_exit_handler(
fun () -> [] end,
fun () -> [{file_descriptors, file_handle_cache:info()}] end),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 187ec1ab..04e0c141 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -23,7 +23,7 @@
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
--export([remote_conserve_memory/2]). %% Internal use only
+-export([remote_conserve_resources/3]). %% Internal use only
-record(alarms, {alertees, alarmed_nodes}).
@@ -45,6 +45,9 @@ start() ->
ok = alarm_handler:add_alarm_handler(?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]),
+
+ {ok, DiskLimit} = application:get_env(disk_free_limit),
+ rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
ok.
stop() ->
@@ -61,41 +64,41 @@ on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}).
%% Can't use alarm_handler:{set,clear}_alarm because that doesn't
%% permit notifying a remote node.
-remote_conserve_memory(Pid, true) ->
+remote_conserve_resources(Pid, Source, true) ->
gen_event:notify({alarm_handler, node(Pid)},
- {set_alarm, {{vm_memory_high_watermark, node()}, []}});
-remote_conserve_memory(Pid, false) ->
+ {set_alarm, {{resource_limit, Source, node()}, []}});
+remote_conserve_resources(Pid, Source, false) ->
gen_event:notify({alarm_handler, node(Pid)},
- {clear_alarm, {vm_memory_high_watermark, node()}}).
+ {clear_alarm, {resource_limit, Source, node()}}).
%%----------------------------------------------------------------------------
init([]) ->
{ok, #alarms{alertees = dict:new(),
- alarmed_nodes = sets:new()}}.
+ alarmed_nodes = dict:new()}}.
handle_call({register, Pid, HighMemMFA}, State) ->
- {ok, 0 < sets:size(State#alarms.alarmed_nodes),
+ {ok, 0 < dict:size(State#alarms.alarmed_nodes),
internal_register(Pid, HighMemMFA, State)};
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) ->
- {ok, maybe_alert(fun sets:add_element/2, Node, State)};
+handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
+ {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
-handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) ->
- {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
handle_event({node_up, Node}, State) ->
%% Must do this via notify and not call to avoid possible deadlock.
ok = gen_event:notify(
{alarm_handler, Node},
- {register, self(), {?MODULE, remote_conserve_memory, []}}),
+ {register, self(), {?MODULE, remote_conserve_resources, []}}),
{ok, State};
handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+ {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
handle_event({register, Pid, HighMemMFA}, State) ->
{ok, internal_register(Pid, HighMemMFA, State)};
@@ -118,34 +121,58 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN,
- alertees = Alertees}) ->
- AN1 = SetFun(Node, AN),
- BeforeSz = sets:size(AN),
- AfterSz = sets:size(AN1),
+dict_unappend_all(Key, _Val, Dict) ->
+ dict:erase(Key, Dict).
+
+dict_unappend(Key, Val, Dict) ->
+ case lists:delete(Val, dict:fetch(Key, Dict)) of
+ [] -> dict:erase(Key, Dict);
+ X -> dict:store(Key, X, Dict)
+ end.
+
+count_dict_values(Val, Dict) ->
+ dict:fold(fun (_Node, List, Count) ->
+ Count + case lists:member(Val, List) of
+ true -> 1;
+ false -> 0
+ end
+ end, 0, Dict).
+
+maybe_alert(UpdateFun, Node, Source,
+ State = #alarms{alarmed_nodes = AN,
+ alertees = Alertees}) ->
+ AN1 = UpdateFun(Node, Source, AN),
+ BeforeSz = count_dict_values(Source, AN),
+ AfterSz = count_dict_values(Source, AN1),
+
%% If we have changed our alarm state, inform the remotes.
IsLocal = Node =:= node(),
- if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees);
- IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees);
- true -> ok
+ if IsLocal andalso BeforeSz < AfterSz ->
+ ok = alert_remote(true, Alertees, Source);
+ IsLocal andalso BeforeSz > AfterSz ->
+ ok = alert_remote(false, Alertees, Source);
+ true ->
+ ok
end,
%% If the overall alarm state has changed, inform the locals.
- case {BeforeSz, AfterSz} of
- {0, 1} -> ok = alert_local(true, Alertees);
- {1, 0} -> ok = alert_local(false, Alertees);
+ case {dict:size(AN), dict:size(AN1)} of
+ {0, 1} -> ok = alert_local(true, Alertees, Source);
+ {1, 0} -> ok = alert_local(false, Alertees, Source);
{_, _} -> ok
end,
State#alarms{alarmed_nodes = AN1}.
-alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2).
+alert_local(Alert, Alertees, _Source) ->
+ alert(Alertees, [Alert], fun erlang:'=:='/2).
-alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2).
+alert_remote(Alert, Alertees, Source) ->
+ alert(Alertees, [Source, Alert], fun erlang:'=/='/2).
-alert(Alert, Alertees, NodeComparator) ->
+alert(Alertees, AlertArg, NodeComparator) ->
Node = node(),
dict:fold(fun (Pid, {M, F, A}, ok) ->
case NodeComparator(Node, node(Pid)) of
- true -> apply(M, F, A ++ [Pid, Alert]);
+ true -> apply(M, F, A ++ [Pid] ++ AlertArg);
false -> ok
end
end, ok, Alertees).
@@ -153,9 +180,9 @@ alert(Alert, Alertees, NodeComparator) ->
internal_register(Pid, {M, F, A} = HighMemMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
- case sets:is_element(node(), State#alarms.alarmed_nodes) of
- true -> ok = apply(M, F, A ++ [Pid, true]);
- false -> ok
+ case dict:find(node(), State#alarms.alarmed_nodes) of
+ {ok, _Sources} -> apply(M, F, A ++ [Pid, true]);
+ error -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
State#alarms{alertees = NewAlertees}.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d643cecd..2de020bb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -48,8 +48,7 @@
ttl,
ttl_timer_ref,
publish_seqno,
- unconfirmed_mq,
- unconfirmed_qm,
+ unconfirmed,
delayed_stop,
queue_monitors,
dlx,
@@ -135,8 +134,7 @@ init(Q) ->
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
@@ -161,8 +159,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
expiry_timer_ref = undefined,
ttl = undefined,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = MTC},
@@ -731,11 +728,9 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
end.
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed_mq = UMQ,
- dlx = DLX,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC,
+ dlx = DLX}) ->
{ok, _, QPids} =
rabbit_basic:publish(
rabbit_basic:delivery(
@@ -744,20 +739,9 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
State1 = lists:foldl(fun monitor_queue/2, State, QPids),
State2 = State1#q{publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS),
- cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
- _ -> State3 =
- lists:foldl(
- fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
- UQM1 = rabbit_misc:gb_trees_set_insert(
- QPid, MsgSeqNo, UQM),
- State0#q{unconfirmed_qm = UQM1}
- end, State2, QPids),
- noreply(State3#q{
- unconfirmed_mq =
- gb_trees:insert(
- MsgSeqNo, {gb_sets:from_list(QPids),
- AckTag}, UMQ)})
+ [] -> cleanup_after_confirm([AckTag], State2);
+ _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
+ noreply(State2#q{unconfirmed = UC1})
end.
monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
@@ -776,64 +760,31 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
end.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed_qm = UQM}) ->
+ unconfirmed = UC}) ->
case dict:find(QPid, QMons) of
error ->
noreply(State);
{ok, _} ->
rabbit_log:info("DLQ ~p (for ~s) died~n",
[QPid, rabbit_misc:rs(qname(State))]),
- State1 = State#q{queue_monitors = dict:erase(QPid, QMons)},
- case gb_trees:lookup(QPid, UQM) of
- none ->
- noreply(State1);
- {value, MsgSeqNosSet} ->
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> rabbit_log:warning(
- "Dead queue lost ~p messages~n",
- [gb_sets:size(MsgSeqNosSet)]);
- false -> ok
- end,
- handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1)
- end
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
+ case (MsgSeqNoAckTags =/= [] andalso
+ rabbit_misc:is_abnormal_termination(Reason)) of
+ true -> rabbit_log:warning("Dead queue lost ~p messages~n",
+ [length(MsgSeqNoAckTags)]);
+ false -> ok
+ end,
+ cleanup_after_confirm(
+ [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State#q{queue_monitors = dict:erase(QPid, QMons),
+ unconfirmed = UC1})
end.
-handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckTags1, UMQ3} =
- lists:foldl(
- fun (MsgSeqNo, {AckTags, UMQ1}) ->
- {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1),
- QPids1 = gb_sets:delete(QPid, QPids),
- case gb_sets:is_empty(QPids1) of
- true -> {[AckTag | AckTags],
- gb_trees:delete(MsgSeqNo, UMQ1)};
- false -> {AckTags, gb_trees:update(
- MsgSeqNo, {QPids1, AckTag}, UMQ1)}
- end
- end, {[], UMQ}, MsgSeqNos),
- {_Guids, BQS1} = BQ:ack(AckTags1, BQS),
- MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
- gb_sets:from_list(MsgSeqNos)),
- State1 = case gb_sets:is_empty(MsgSeqNos1) of
- false -> State#q{
- unconfirmed_qm =
- gb_trees:update(QPid, MsgSeqNos1, UQM)};
- true -> demonitor_queue(
- QPid, State#q{
- unconfirmed_qm =
- gb_trees:delete(QPid, UQM)})
- end,
- cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
- backing_queue_state = BQS1}).
-
stop_later(Reason, State) ->
stop_later(Reason, undefined, noreply, State).
-stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
- case {gb_trees:is_empty(UMQ), Reply} of
+stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) ->
+ case {dtree:is_empty(UC), Reply} of
{true, noreply} ->
{stop, Reason, State};
{true, _} ->
@@ -842,16 +793,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
noreply(State#q{delayed_stop = {Reason, {From, Reply}}})
end.
-cleanup_after_confirm(State = #q{delayed_stop = DS,
- unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) andalso DS =/= undefined of
+cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
+ unconfirmed = UC,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of
{_, {_, noreply}} -> ok;
{_, {From, Reply}} -> gen_server2:reply(From, Reply)
end,
{Reason, _} = DS,
- {stop, Reason, State};
- false -> noreply(State)
+ {stop, Reason, State1};
+ false -> noreply(State1)
end.
already_been_here(_Delivery, #q{dlx = undefined}) ->
@@ -1232,8 +1187,14 @@ handle_call(force_event_refresh, _From,
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State) ->
- handle_confirm(MsgSeqNos, QPid, State);
+handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
+ {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ State1 = case dtree:is_defined(QPid, UC1) of
+ false -> demonitor_queue(QPid, State);
+ true -> State
+ end,
+ cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State1#q{unconfirmed = UC1});
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
noreply(State);
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 25485ca0..a89aa074 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -199,9 +199,9 @@ replace_headers(Headers, Content = #content{properties = Props}) ->
indexof(L, Element) -> indexof(L, Element, 1).
-indexof([], _Element, _N) -> 0;
-indexof([Element | _Rest], Element, N) -> N;
-indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
+indexof([], _Element, _N) -> 0;
+indexof([Element | _Rest], Element, N) -> N;
+indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index cac622f8..4a0e93be 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,8 +37,8 @@
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
- stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state}).
+ stats_timer, confirm_enabled, publish_seqno, unconfirmed,
+ confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
confirmed = [],
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
@@ -548,45 +547,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State) ->
- {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
- record_confirms(MXs, State1).
-
-process_confirms(MsgSeqNos, QPid, Nack, State) ->
- lists:foldl(
- fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
- none -> Acc
- end
- end, {[], State}, MsgSeqNos).
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
- {MXs, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}},
- Nack) ->
- State1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> UQM1 = gb_trees:delete(QPid, UQM),
- State#ch{unconfirmed_qm = UQM1};
- false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State#ch{unconfirmed_qm = UQM1}
- end;
- none ->
- State
- end,
- Qs1 = gb_sets:del_element(QPid, Qs),
- %% If QPid somehow died initiating a nack, clear the message from
- %% internal data-structures. Also, cleanup empty entries.
- case (Nack orelse gb_sets:is_empty(Qs1)) of
- true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ),
- {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}};
- false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ),
- {MXs, State1#ch{unconfirmed_mq = UMQ1}}
- end.
+confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -1152,22 +1115,12 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
true -> State
end.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
- end,
- %% We remove the MsgSeqNos from UQM before calling
- %% process_confirms to prevent each MsgSeqNo being removed from
- %% the set one by one which which would be inefficient
- State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {Nack, SendFun} =
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> {true, fun send_nacks/2};
- false -> {false, fun record_confirms/2}
- end,
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
- SendFun(MXs, State2).
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(QPid, UC),
+ (case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> fun send_nacks/2;
+ false -> fun record_confirms/2
+ end)(MXs, State#ch{unconfirmed = UC1}).
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
@@ -1392,21 +1345,8 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ} = State,
- UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
- SingletonSet = gb_sets:singleton(MsgSeqNo),
- lists:foldl(
- fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) ->
- case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State0#ch{unconfirmed_qm = UQM1};
- none ->
- UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- State0#ch{unconfirmed_qm = UQM1}
- end
- end, State#ch{unconfirmed_mq = UMQ1}, QPids).
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+ State#ch.unconfirmed)}.
send_nacks([], State) ->
State;
@@ -1444,11 +1384,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UMQ) of
+ CutOff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
+ false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1462,8 +1402,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
State;
-maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) of
+maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
+ case dtree:is_empty(UC) of
false -> State;
true -> complete_tx(State#ch{confirmed = []})
end.
@@ -1491,8 +1431,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
- gb_trees:size(UMQ);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
+ dtree:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6a775adf..51f88c8f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -328,7 +328,7 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
0 -> Arg ++ ".0";
_ -> Arg
end),
- Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]),
+ Inform("Setting memory threshold on ~p to ~p", [Node, Frac]),
rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
new file mode 100644
index 00000000..831d5290
--- /dev/null
+++ b/src/rabbit_disk_monitor.erl
@@ -0,0 +1,196 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_disk_monitor).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0,
+ set_check_interval/1, get_disk_free/0]).
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_DISK_CHECK_INTERVAL, 60000).
+
+-record(state, {dir,
+ limit,
+ timeout,
+ timer,
+ alarmed
+ }).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(disk_free_limit() :: (integer() | {'mem_relative', float()})).
+-spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()).
+-spec(get_disk_free_limit/0 :: () -> integer()).
+-spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok').
+-spec(get_check_interval/0 :: () -> integer()).
+-spec(set_check_interval/1 :: (integer()) -> 'ok').
+-spec(get_disk_free/0 :: () -> (integer() | 'unknown')).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+get_disk_free_limit() ->
+ gen_server:call(?MODULE, get_disk_free_limit, infinity).
+
+set_disk_free_limit(Limit) ->
+ gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity).
+
+get_check_interval() ->
+ gen_server:call(?MODULE, get_check_interval, infinity).
+
+set_check_interval(Interval) ->
+ gen_server:call(?MODULE, {set_check_interval, Interval}, infinity).
+
+get_disk_free() ->
+ gen_server:call(?MODULE, get_disk_free, infinity).
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+
+init([Limit]) ->
+ TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL),
+ Dir = dir(),
+ State = #state { dir = Dir,
+ timeout = ?DEFAULT_DISK_CHECK_INTERVAL,
+ timer = TRef,
+ alarmed = false},
+ case {catch get_disk_free(Dir),
+ vm_memory_monitor:get_total_memory()} of
+ {N1, N2} when is_integer(N1), is_integer(N2) ->
+ {ok, set_disk_limits(State, Limit)};
+ _ ->
+ rabbit_log:info("Disabling disk free space monitoring "
+ "on unsupported platform~n"),
+ {stop, unsupported_platform}
+ end.
+
+handle_call(get_disk_free_limit, _From, State) ->
+ {reply, interpret_limit(State#state.limit), State};
+
+handle_call({set_disk_free_limit, Limit}, _From, State) ->
+ {reply, ok, set_disk_limits(State, Limit)};
+
+handle_call(get_check_interval, _From, State) ->
+ {reply, State#state.timeout, State};
+
+handle_call({set_check_interval, Timeout}, _From, State) ->
+ {ok, cancel} = timer:cancel(State#state.timer),
+ {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+
+handle_call(get_disk_free, _From, State = #state { dir = Dir }) ->
+ {reply, get_disk_free(Dir), State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+%% Server Internals
+%%----------------------------------------------------------------------------
+
+% the partition / drive containing this directory will be monitored
+dir() -> rabbit_mnesia:dir().
+
+set_disk_limits(State, Limit) ->
+ State1 = State#state { limit = Limit },
+ rabbit_log:info("Disk free limit set to ~pMB~n",
+ [trunc(interpret_limit(Limit) / 1048576)]),
+ internal_update(State1).
+
+internal_update(State = #state { limit = Limit,
+ dir = Dir,
+ alarmed = Alarmed}) ->
+ CurrentFreeBytes = get_disk_free(Dir),
+ LimitBytes = interpret_limit(Limit),
+ NewAlarmed = CurrentFreeBytes < LimitBytes,
+ case {Alarmed, NewAlarmed} of
+ {false, true} ->
+ emit_update_info("exceeded", CurrentFreeBytes, LimitBytes),
+ alarm_handler:set_alarm({{resource_limit, disk, node()}, []});
+ {true, false} ->
+ emit_update_info("below limit", CurrentFreeBytes, LimitBytes),
+ alarm_handler:clear_alarm({resource_limit, disk, node()});
+ _ ->
+ ok
+ end,
+ State #state {alarmed = NewAlarmed}.
+
+get_disk_free(Dir) ->
+ get_disk_free(Dir, os:type()).
+
+get_disk_free(Dir, {unix, Sun})
+ when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris ->
+ parse_free_unix(rabbit_misc:os_cmd("/usr/bin/df -k " ++ Dir));
+get_disk_free(Dir, {unix, _}) ->
+ parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
+get_disk_free(Dir, {win32, _}) ->
+ parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir));
+get_disk_free(_, _) ->
+ unknown.
+
+parse_free_unix(CommandResult) ->
+ [_, Stats | _] = string:tokens(CommandResult, "\n"),
+ [_FS, _Total, _Used, Free | _] = string:tokens(Stats, " \t"),
+ list_to_integer(Free) * 1024.
+
+parse_free_win32(CommandResult) ->
+ LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
+ [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "),
+ list_to_integer(Free).
+
+interpret_limit({mem_relative, R}) ->
+ round(R * vm_memory_monitor:get_total_memory());
+interpret_limit(L) ->
+ L.
+
+emit_update_info(State, CurrentFree, Limit) ->
+ rabbit_log:info(
+ "Disk free space limit now ~s. Free bytes:~p Limit:~p~n",
+ [State, CurrentFree, Limit]).
+
+start_timer(Timeout) ->
+ {ok, TRef} = timer:send_interval(Timeout, update),
+ TRef.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ddf7f574..c1be7613 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,8 +46,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([dict_cons/3, orddict_cons/3, gb_trees_cons/3,
- gb_trees_set_insert/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
@@ -60,6 +59,7 @@
-export([append_rpc_all_nodes/4]).
-export([multi_call/2]).
-export([quit/1]).
+-export([os_cmd/1]).
%%----------------------------------------------------------------------------
@@ -176,7 +176,6 @@
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
--spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()).
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
-spec(gb_trees_foreach/2 ::
(fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
@@ -204,6 +203,7 @@
-spec(multi_call/2 ::
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(quit/1 :: (integer() | string()) -> no_return()).
+-spec(os_cmd/1 :: (string()) -> string()).
-endif.
@@ -717,15 +717,6 @@ gb_trees_cons(Key, Value, Tree) ->
none -> gb_trees:insert(Key, [Value], Tree)
end.
-gb_trees_set_insert(Key, Value, Tree) ->
- case gb_trees:lookup(Key, Tree) of
- {value, Values} ->
- Values1 = gb_sets:insert(Value, Values),
- gb_trees:update(Key, Values1, Tree);
- none ->
- gb_trees:insert(Key, gb_sets:singleton(Value), Tree)
- end.
-
gb_trees_fold(Fun, Acc, Tree) ->
gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))).
@@ -914,3 +905,10 @@ quit(Status) ->
{unix, _} -> halt(Status);
{win32, _} -> init:stop(Status)
end.
+
+os_cmd(Command) ->
+ Exec = hd(string:tokens(Command, " ")),
+ case os:find_executable(Exec) of
+ false -> throw({command_not_found, Exec});
+ _ -> os:cmd(Command)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 47e796dc..5e9e78d3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -25,7 +25,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/2, server_properties/1]).
+-export([conserve_resources/2, server_properties/1]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -38,7 +38,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_memory,
+ auth_mechanism, auth_state, conserve_resources,
last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
@@ -71,7 +71,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
@@ -133,8 +133,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
+conserve_resources(Pid, Conserve) ->
+ Pid ! {conserve_resources, Conserve},
ok.
server_properties(Protocol) ->
@@ -218,7 +218,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf_len = 0,
auth_mechanism = none,
auth_state = none,
- conserve_memory = false,
+ conserve_resources = false,
last_blocked_by = none,
last_blocked_at = never},
try
@@ -276,8 +276,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve}));
+handle_other({conserve_resources, Conserve}, Deb, State) ->
+ recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -358,8 +358,8 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_memory = Mem}) ->
+control_throttle(State = #v1{connection_state = CS,
+ conserve_resources = Mem}) ->
case {CS, Mem orelse credit_flow:blocked()} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
@@ -377,9 +377,9 @@ maybe_block(State = #v1{connection_state = blocking}) ->
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_memory = true}) ->
- State#v1{last_blocked_by = mem};
-update_last_blocked_by(State = #v1{conserve_memory = false}) ->
+update_last_blocked_by(State = #v1{conserve_resources = true}) ->
+ State#v1{last_blocked_by = resource};
+update_last_blocked_by(State = #v1{conserve_resources = false}) ->
State#v1{last_blocked_by = flow}.
close_connection(State = #v1{queue_collector = Collector,
@@ -701,11 +701,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- conserve_memory = Conserve}),
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ conserve_resources = Conserve}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 0965e3b3..bf2b4798 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -52,22 +52,20 @@ start_child(Mod, Args) ->
start_child(Mod, Mod, Args).
start_child(ChildId, Mod, Args) ->
- {ok, _} = supervisor:start_child(?SERVER,
- {ChildId, {Mod, start_link, Args},
- transient, ?MAX_WAIT, worker, [Mod]}),
- ok.
+ child_reply(supervisor:start_child(?SERVER,
+ {ChildId, {Mod, start_link, Args},
+ transient, ?MAX_WAIT, worker, [Mod]})).
start_restartable_child(Mod) ->
start_restartable_child(Mod, []).
start_restartable_child(Mod, Args) ->
Name = list_to_atom(atom_to_list(Mod) ++ "_sup"),
- {ok, _} = supervisor:start_child(
- ?SERVER,
- {Name, {rabbit_restartable_sup, start_link,
- [Name, {Mod, start_link, Args}]},
- transient, infinity, supervisor, [rabbit_restartable_sup]}),
- ok.
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {Name, {rabbit_restartable_sup, start_link,
+ [Name, {Mod, start_link, Args}]},
+ transient, infinity, supervisor, [rabbit_restartable_sup]})).
stop_child(ChildId) ->
case supervisor:terminate_child(?SERVER, ChildId) of
@@ -77,3 +75,9 @@ stop_child(ChildId) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
+
+
+%%----------------------------------------------------------------------------
+
+child_reply({ok, _}) -> ok;
+child_reply(X) -> X.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 55e4a6f8..e356d7ff 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -883,6 +883,8 @@ test_cluster_management() ->
"invalid2@invalid"]),
ok = assert_ram_node(),
+ ok = control_action(reset, []),
+
SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = test_cluster_management2(SecondaryNode);
@@ -898,7 +900,6 @@ test_cluster_management2(SecondaryNode) ->
SecondaryNodeS = atom_to_list(SecondaryNode),
%% make a disk node
- ok = control_action(reset, []),
ok = control_action(cluster, [NodeS]),
ok = assert_disc_node(),
%% make a ram node
@@ -1244,6 +1245,9 @@ test_confirms() ->
},
rabbit_basic:build_content(
#'P_basic'{delivery_mode = 2}, <<"">>)),
+ %% We must not kill the queue before the channel has processed the
+ %% 'publish'.
+ ok = rabbit_channel:flush(Ch),
%% Crash the queue
QPid1 ! boom,
%% Wait for a nack
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index fca55f02..fb184d1a 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -181,10 +181,10 @@ internal_update(State = #state { memory_limit = MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info(set, MemUsed, MemLimit),
- alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
+ alarm_handler:set_alarm({{resource_limit, memory, node()}, []});
{true, false} ->
emit_update_info(clear, MemUsed, MemLimit),
- alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
+ alarm_handler:clear_alarm({resource_limit, memory, node()});
_ ->
ok
end,