summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-04-11 22:39:35 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-04-11 22:39:35 +0100
commit93b9c9f5c282418e66091200490090a82cdfe249 (patch)
tree946edb229c90fab30808fc4f00295d38e89eaf02
parentacb9abe971cef1f6dec482d8e351c1b62de24e08 (diff)
parent797c633e2a56ffed0ae338979471a97bc7f26bbb (diff)
downloadrabbitmq-server-93b9c9f5c282418e66091200490090a82cdfe249.tar.gz
merge bug24769
-rw-r--r--Makefile8
-rw-r--r--docs/rabbitmqctl.1.xml6
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit_backing_queue_spec.hrl7
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/debs/Debian/debian/postrm.in9
-rw-r--r--src/dtree.erl160
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_alarm.erl89
-rw-r--r--src/rabbit_amqqueue.erl125
-rw-r--r--src/rabbit_amqqueue_process.erl309
-rw-r--r--src/rabbit_basic.erl19
-rw-r--r--src/rabbit_channel.erl103
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_disk_monitor.erl196
-rw-r--r--src/rabbit_exchange.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl9
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_misc.erl22
-rw-r--r--src/rabbit_nodes.erl16
-rw-r--r--src/rabbit_reader.erl34
-rw-r--r--src/rabbit_sup.erl24
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_writer.erl110
-rw-r--r--src/vm_memory_monitor.erl4
25 files changed, 803 insertions, 473 deletions
diff --git a/Makefile b/Makefile
index e8975856..d16cd4d0 100644
--- a/Makefile
+++ b/Makefile
@@ -216,12 +216,12 @@ start-rabbit-on-node: all
stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
-set-memory-alarm: all
- echo "alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []})." | \
+set-resource-alarm: all
+ echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \
$(ERL_CALL)
-clear-memory-alarm: all
- echo "alarm_handler:clear_alarm({vm_memory_high_watermark, node()})." | \
+clear-resource-alarm: all
+ echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \
$(ERL_CALL)
stop-node:
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index ee07ebfe..ded3ab48 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -948,7 +948,7 @@
<term>source_kind</term>
<listitem><para>The kind of the source of messages to
which the binding is attached. Currently always
- queue. With non-ASCII characters escaped as in
+ exchange. With non-ASCII characters escaped as in
C.</para></listitem>
</varlistentry>
<varlistentry>
@@ -1074,8 +1074,8 @@
<varlistentry>
<term>last_blocked_by</term>
<listitem><para>The reason for which this connection
- was last blocked. One of 'mem' - due to a memory
- alarm, 'flow' - due to internal flow control, or
+ was last blocked. One of 'resource' - due to a memory
+ or disk alarm, 'flow' - due to internal flow control, or
'none' if the connection was never
blocked.</para></listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index fd19051d..b7d14f20 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -19,6 +19,7 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
+ {disk_free_limit, {mem_relative, 1.0}},
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
{frame_max, 131072},
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index bf93baba..79d44e1b 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -25,7 +25,8 @@
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok')).
+-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
+ 'undefined').
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
@@ -45,8 +46,8 @@
-> {undefined, state()}).
-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
-spec(dropwhile/3 ::
- (fun ((rabbit_types:message_properties()) -> boolean()),
- msg_fun() | 'undefined', state())
+ (fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
+ state())
-> state()).
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index fb02cd6a..e935acf5 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,6 +2,8 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
+Uploader: Emile Joubert <emile@rabbitmq.com>
+DM-Upload-Allowed: yes
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
Standards-Version: 3.8.0
diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in
index baf081fc..c2e9bbfe 100644
--- a/packaging/debs/Debian/debian/postrm.in
+++ b/packaging/debs/Debian/debian/postrm.in
@@ -35,20 +35,15 @@ case "$1" in
if [ -d /etc/rabbitmq ]; then
rm -r /etc/rabbitmq
fi
- remove_plugin_traces
+ remove_plugin_traces
if getent passwd rabbitmq >/dev/null; then
# Stop epmd if run by the rabbitmq user
pkill -u rabbitmq epmd || :
-
- deluser rabbitmq
- fi
- if getent group rabbitmq >/dev/null; then
- delgroup rabbitmq
fi
;;
remove|upgrade)
- remove_plugin_traces
+ remove_plugin_traces
;;
failed-upgrade|abort-install|abort-upgrade|disappear)
diff --git a/src/dtree.erl b/src/dtree.erl
new file mode 100644
index 00000000..67bbbc1b
--- /dev/null
+++ b/src/dtree.erl
@@ -0,0 +1,160 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+%% A dual-index tree.
+%%
+%% Entries have the following shape:
+%%
+%% +----+--------------------+---+
+%% | PK | SK1, SK2, ..., SKN | V |
+%% +----+--------------------+---+
+%%
+%% i.e. a primary key, set of secondary keys, and a value.
+%%
+%% There can be only one entry per primary key, but secondary keys may
+%% appear in multiple entries.
+%%
+%% The set of secondary keys must be non-empty. Or, to put it another
+%% way, entries only exist while their secondary key set is non-empty.
+
+-module(dtree).
+
+-export([empty/0, insert/4, take/3, take/2, take_all/2,
+ is_defined/2, is_empty/1, smallest/1, size/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: {gb_tree(), gb_tree()}).
+
+-type(pk() :: any()).
+-type(sk() :: any()).
+-type(val() :: any()).
+-type(kv() :: {pk(), val()}).
+
+-spec(empty/0 :: () -> ?MODULE()).
+-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()).
+-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+-spec(smallest/1 :: (?MODULE()) -> kv()).
+-spec(size/1 :: (?MODULE()) -> non_neg_integer()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+empty() -> {gb_trees:empty(), gb_trees:empty()}.
+
+%% Insert an entry. Fails if there already is an entry with the given
+%% primary key.
+insert(PK, [], V, {P, S}) ->
+ %% dummy insert to force error if PK exists
+ gb_trees:insert(PK, {gb_sets:empty(), V}, P),
+ {P, S};
+insert(PK, SKs, V, {P, S}) ->
+ {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P),
+ lists:foldl(fun (SK, S0) ->
+ case gb_trees:lookup(SK, S0) of
+ {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS),
+ gb_trees:update(SK, PKS1, S0);
+ none -> PKS = gb_sets:singleton(PK),
+ gb_trees:insert(SK, PKS, S0)
+ end
+ end, S, SKs)}.
+
+%% Remove the given secondary key from the entries of the given
+%% primary keys, returning the primary-key/value pairs of any entries
+%% that were dropped as the result (i.e. due to their secondary key
+%% set becoming empty). It is ok for the given primary keys and/or
+%% secondary key to not exist.
+take(PKs, SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> TakenPKS = gb_sets:from_list(PKs),
+ PKSInter = gb_sets:intersection(PKS, TakenPKS),
+ PKSDiff = gb_sets:difference (PKS, TakenPKS),
+ {KVs, P1} = take2(PKSInter, SK, P),
+ {KVs, {P1, case gb_sets:is_empty(PKSDiff) of
+ true -> gb_trees:delete(SK, S);
+ false -> gb_trees:update(SK, PKSDiff, S)
+ end}}
+ end.
+
+%% Remove the given secondary key from all entries, returning the
+%% primary-key/value pairs of any entries that were dropped as the
+%% result (i.e. due to their secondary key set becoming empty). It is
+%% ok for the given secondary key to not exist.
+take(SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> {KVs, P1} = take2(PKS, SK, P),
+ {KVs, {P1, gb_trees:delete(SK, S)}}
+ end.
+
+%% Drop all entries which contain the given secondary key, returning
+%% the primary-key/value pairs of these entries. It is ok for the
+%% given secondary key to not exist.
+take_all(SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P),
+ {KVs, {P1, prune(SKS, PKS, S)}}
+ end.
+
+is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
+
+is_empty({P, _S}) -> gb_trees:is_empty(P).
+
+smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P),
+ {K, V}.
+
+size({P, _S}) -> gb_trees:size(P).
+
+%%----------------------------------------------------------------------------
+
+take2(PKS, SK, P) ->
+ gb_sets:fold(fun (PK, {KVs, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ SKS1 = gb_sets:delete(SK, SKS),
+ case gb_sets:is_empty(SKS1) of
+ true -> KVs1 = [{PK, V} | KVs],
+ {KVs1, gb_trees:delete(PK, P0)};
+ false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
+ end
+ end, {[], P}, PKS).
+
+take_all2(PKS, P) ->
+ gb_sets:fold(fun (PK, {KVs, SKS0, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ {[{PK, V} | KVs], gb_sets:union(SKS, SKS0),
+ gb_trees:delete(PK, P0)}
+ end, {[], gb_sets:empty(), P}, PKS).
+
+prune(SKS, PKS, S) ->
+ gb_sets:fold(fun (SK0, S0) ->
+ PKS1 = gb_trees:get(SK0, S0),
+ PKS2 = gb_sets:difference(PKS1, PKS),
+ case gb_sets:is_empty(PKS2) of
+ true -> gb_trees:delete(SK0, S0);
+ false -> gb_trees:update(SK0, PKS2, S0)
+ end
+ end, S, SKS).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index dd5fb89c..eec7e34e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -327,7 +327,11 @@ status() ->
[{vm_memory_high_watermark, {vm_memory_monitor,
get_vm_memory_high_watermark, []}},
{vm_memory_limit, {vm_memory_monitor,
- get_memory_limit, []}}]),
+ get_memory_limit, []}},
+ {disk_free_limit, {rabbit_disk_monitor,
+ get_disk_free_limit, []}},
+ {disk_free, {rabbit_disk_monitor,
+ get_disk_free, []}}]),
S3 = rabbit_misc:with_exit_handler(
fun () -> [] end,
fun () -> [{file_descriptors, file_handle_cache:info()}] end),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 187ec1ab..04e0c141 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -23,7 +23,7 @@
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
--export([remote_conserve_memory/2]). %% Internal use only
+-export([remote_conserve_resources/3]). %% Internal use only
-record(alarms, {alertees, alarmed_nodes}).
@@ -45,6 +45,9 @@ start() ->
ok = alarm_handler:add_alarm_handler(?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]),
+
+ {ok, DiskLimit} = application:get_env(disk_free_limit),
+ rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
ok.
stop() ->
@@ -61,41 +64,41 @@ on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}).
%% Can't use alarm_handler:{set,clear}_alarm because that doesn't
%% permit notifying a remote node.
-remote_conserve_memory(Pid, true) ->
+remote_conserve_resources(Pid, Source, true) ->
gen_event:notify({alarm_handler, node(Pid)},
- {set_alarm, {{vm_memory_high_watermark, node()}, []}});
-remote_conserve_memory(Pid, false) ->
+ {set_alarm, {{resource_limit, Source, node()}, []}});
+remote_conserve_resources(Pid, Source, false) ->
gen_event:notify({alarm_handler, node(Pid)},
- {clear_alarm, {vm_memory_high_watermark, node()}}).
+ {clear_alarm, {resource_limit, Source, node()}}).
%%----------------------------------------------------------------------------
init([]) ->
{ok, #alarms{alertees = dict:new(),
- alarmed_nodes = sets:new()}}.
+ alarmed_nodes = dict:new()}}.
handle_call({register, Pid, HighMemMFA}, State) ->
- {ok, 0 < sets:size(State#alarms.alarmed_nodes),
+ {ok, 0 < dict:size(State#alarms.alarmed_nodes),
internal_register(Pid, HighMemMFA, State)};
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) ->
- {ok, maybe_alert(fun sets:add_element/2, Node, State)};
+handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
+ {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
-handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) ->
- {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
handle_event({node_up, Node}, State) ->
%% Must do this via notify and not call to avoid possible deadlock.
ok = gen_event:notify(
{alarm_handler, Node},
- {register, self(), {?MODULE, remote_conserve_memory, []}}),
+ {register, self(), {?MODULE, remote_conserve_resources, []}}),
{ok, State};
handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+ {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
handle_event({register, Pid, HighMemMFA}, State) ->
{ok, internal_register(Pid, HighMemMFA, State)};
@@ -118,34 +121,58 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN,
- alertees = Alertees}) ->
- AN1 = SetFun(Node, AN),
- BeforeSz = sets:size(AN),
- AfterSz = sets:size(AN1),
+dict_unappend_all(Key, _Val, Dict) ->
+ dict:erase(Key, Dict).
+
+dict_unappend(Key, Val, Dict) ->
+ case lists:delete(Val, dict:fetch(Key, Dict)) of
+ [] -> dict:erase(Key, Dict);
+ X -> dict:store(Key, X, Dict)
+ end.
+
+count_dict_values(Val, Dict) ->
+ dict:fold(fun (_Node, List, Count) ->
+ Count + case lists:member(Val, List) of
+ true -> 1;
+ false -> 0
+ end
+ end, 0, Dict).
+
+maybe_alert(UpdateFun, Node, Source,
+ State = #alarms{alarmed_nodes = AN,
+ alertees = Alertees}) ->
+ AN1 = UpdateFun(Node, Source, AN),
+ BeforeSz = count_dict_values(Source, AN),
+ AfterSz = count_dict_values(Source, AN1),
+
%% If we have changed our alarm state, inform the remotes.
IsLocal = Node =:= node(),
- if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees);
- IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees);
- true -> ok
+ if IsLocal andalso BeforeSz < AfterSz ->
+ ok = alert_remote(true, Alertees, Source);
+ IsLocal andalso BeforeSz > AfterSz ->
+ ok = alert_remote(false, Alertees, Source);
+ true ->
+ ok
end,
%% If the overall alarm state has changed, inform the locals.
- case {BeforeSz, AfterSz} of
- {0, 1} -> ok = alert_local(true, Alertees);
- {1, 0} -> ok = alert_local(false, Alertees);
+ case {dict:size(AN), dict:size(AN1)} of
+ {0, 1} -> ok = alert_local(true, Alertees, Source);
+ {1, 0} -> ok = alert_local(false, Alertees, Source);
{_, _} -> ok
end,
State#alarms{alarmed_nodes = AN1}.
-alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2).
+alert_local(Alert, Alertees, _Source) ->
+ alert(Alertees, [Alert], fun erlang:'=:='/2).
-alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2).
+alert_remote(Alert, Alertees, Source) ->
+ alert(Alertees, [Source, Alert], fun erlang:'=/='/2).
-alert(Alert, Alertees, NodeComparator) ->
+alert(Alertees, AlertArg, NodeComparator) ->
Node = node(),
dict:fold(fun (Pid, {M, F, A}, ok) ->
case NodeComparator(Node, node(Pid)) of
- true -> apply(M, F, A ++ [Pid, Alert]);
+ true -> apply(M, F, A ++ [Pid] ++ AlertArg);
false -> ok
end
end, ok, Alertees).
@@ -153,9 +180,9 @@ alert(Alert, Alertees, NodeComparator) ->
internal_register(Pid, {M, F, A} = HighMemMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
- case sets:is_element(node(), State#alarms.alarmed_nodes) of
- true -> ok = apply(M, F, A ++ [Pid, true]);
- false -> ok
+ case dict:find(node(), State#alarms.alarmed_nodes) of
+ {ok, _Sources} -> apply(M, F, A ++ [Pid, true]);
+ error -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
State#alarms{alertees = NewAlertees}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7c20f67d..c1673504 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,7 +32,7 @@
%% internal
--export([internal_declare/2, internal_delete/1, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -144,11 +144,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/1 ::
- (name()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/2 ::
+ (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -231,7 +231,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName),
+ false -> TailFun = internal_delete(QueueName, QPid),
fun () -> TailFun(), ExistingQ end
end
end
@@ -330,54 +330,60 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
[<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
check_declare_arguments(QueueName, Args) ->
- [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of
- ok -> ok;
- {error, Error} -> rabbit_misc:protocol_error(
- precondition_failed,
- "invalid arg '~s' for ~s: ~255p",
- [Key, rabbit_misc:rs(QueueName), Error])
- end ||
- {Key, Fun} <-
- [{<<"x-expires">>, fun check_integer_argument/2},
- {<<"x-message-ttl">>, fun check_integer_argument/2},
- {<<"x-ha-policy">>, fun check_ha_policy_argument/2},
- {<<"x-dead-letter-exchange">>, fun check_string_argument/2},
- {<<"x-dead-letter-routing-key">>,
- fun check_dlxrk_argument/2}]],
+ Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
+ {<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
+ {<<"x-ha-policy">>, fun check_ha_policy_arg/2},
+ {<<"x-dead-letter-exchange">>, fun check_string_arg/2},
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
+ [case rabbit_misc:table_lookup(Args, Key) of
+ undefined -> ok;
+ TypeVal -> case Fun(TypeVal, Args) of
+ ok -> ok;
+ {error, Error} -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "invalid arg '~s' for ~s: ~255p",
+ [Key, rabbit_misc:rs(QueueName),
+ Error])
+ end
+ end || {Key, Fun} <- Checks],
ok.
-check_string_argument(undefined, _Args) ->
- ok;
-check_string_argument({longstr, _}, _Args) ->
+check_string_arg({longstr, _}, _Args) ->
ok;
-check_string_argument({Type, _}, _) ->
+check_string_arg({Type, _}, _) ->
{error, {unacceptable_type, Type}}.
-check_integer_argument(undefined, _Args) ->
- ok;
-check_integer_argument({Type, Val}, _Args) when Val > 0 ->
+check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
- end;
-check_integer_argument({_Type, Val}, _Args) ->
- {error, {value_zero_or_less, Val}}.
+ end.
-check_dlxrk_argument(undefined, _Args) ->
- ok;
-check_dlxrk_argument({longstr, _}, Args) ->
+check_positive_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val > 0 -> ok;
+ ok -> {error, {value_zero_or_less, Val}};
+ Error -> Error
+ end.
+
+check_non_neg_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_less_than_zero, Val}};
+ Error -> Error
+ end.
+
+check_dlxrk_arg({longstr, _}, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
undefined -> {error, routing_key_but_no_dlx_defined};
_ -> ok
end;
-check_dlxrk_argument({Type, _}, _Args) ->
+check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-check_ha_policy_argument(undefined, _Args) ->
- ok;
-check_ha_policy_argument({longstr, <<"all">>}, _Args) ->
+check_ha_policy_arg({longstr, <<"all">>}, _Args) ->
ok;
-check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
+check_ha_policy_arg({longstr, <<"nodes">>}, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of
undefined ->
{error, {require, 'x-ha-policy-params'}};
@@ -393,9 +399,9 @@ check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
{Type, _} ->
{error, {ha_nodes_policy_params_not_array_of_longstr, Type}}
end;
-check_ha_policy_argument({longstr, Policy}, _Args) ->
+check_ha_policy_arg({longstr, Policy}, _Args) ->
{error, {invalid_ha_policy, Policy}};
-check_ha_policy_argument({Type, _}, _Args) ->
+check_ha_policy_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
list() ->
@@ -535,13 +541,19 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName) ->
+internal_delete(QueueName, QPid) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- rabbit_binding:process_deletions(Deletions)
+ T = rabbit_binding:process_deletions(Deletions),
+ fun() ->
+ ok = T(),
+ ok = rabbit_event:notify(queue_deleted,
+ [{pid, QPid},
+ {name, QueueName}])
+ end
end
end).
@@ -556,14 +568,25 @@ set_maximum_since_use(QPid, Age) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid,
- slave_pids = []}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node])),
- rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(), Dels))
+ fun () -> QsDels =
+ qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ #amqqueue{name = QName, pid = Pid,
+ slave_pids = []}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node])),
+ {Qs, Dels} = lists:unzip(QsDels),
+ T = rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels)),
+ fun () ->
+ T(),
+ lists:foreach(
+ fun({QName, QPid}) ->
+ ok = rabbit_event:notify(queue_deleted,
+ [{pid, QPid},
+ {name, QName}])
+ end, Qs)
+ end
end).
delete_queue(QueueName) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 61794fd0..f03f40a2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -24,9 +24,6 @@
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--define(BASE_MESSAGE_PROPERTIES,
- #message_properties{expiry = undefined, needs_confirming = false}).
-
-export([start_link/1, info_keys/0]).
-export([init_with_backing_queue_state/8]).
@@ -52,8 +49,7 @@
ttl_timer_ref,
senders,
publish_seqno,
- unconfirmed_mq,
- unconfirmed_qm,
+ unconfirmed,
delayed_stop,
queue_monitors,
dlx,
@@ -140,8 +136,7 @@ init(Q) ->
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty()},
@@ -167,8 +162,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
ttl = undefined,
senders = Senders,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = pmon:new(),
msg_id_to_channel = MTC},
@@ -186,16 +180,13 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate(Reason, State = #q{q = #amqqueue{name = QName},
backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
- terminate_shutdown(fun (BQS) ->
- rabbit_event:notify(
- queue_deleted, [{pid, self()},
- {name, QName}]),
- BQS1 = BQ:delete_and_terminate(Reason, BQS),
- %% don't care if the internal delete
- %% doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(qname(State)),
- BQS1
- end, State).
+ terminate_shutdown(
+ fun (BQS) ->
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
+ %% don't care if the internal delete doesn't return 'ok'.
+ rabbit_amqqueue:internal_delete(QName, self()),
+ BQS1
+ end, State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -336,12 +327,10 @@ ensure_expiry_timer(State = #q{expires = undefined}) ->
State;
ensure_expiry_timer(State = #q{expires = Expires}) ->
case is_unused(State) of
- true ->
- NewState = stop_expiry_timer(State),
- TRef = erlang:send_after(Expires, self(), maybe_expire),
- NewState#q{expiry_timer_ref = TRef};
- false ->
- State
+ true -> NewState = stop_expiry_timer(State),
+ TRef = erlang:send_after(Expires, self(), maybe_expire),
+ NewState#q{expiry_timer_ref = TRef};
+ false -> State
end.
ensure_stats_timer(State) ->
@@ -501,8 +490,10 @@ should_confirm_message(#delivery{sender = SenderPid,
id = MsgId}},
#q{q = #amqqueue{durable = true}}) ->
{eventually, SenderPid, MsgSeqNo, MsgId};
-should_confirm_message(_Delivery, _State) ->
- immediately.
+should_confirm_message(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo},
+ _State) ->
+ {immediately, SenderPid, MsgSeqNo}.
needs_confirming({eventually, _, _, _}) -> true;
needs_confirming(_) -> false.
@@ -511,6 +502,9 @@ maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
State = #q{msg_id_to_channel = MTC}) ->
State#q{msg_id_to_channel =
gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
+maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ State;
maybe_record_confirm_message(_Confirm, State) ->
State.
@@ -522,58 +516,50 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(Delivery = #delivery{sender = SenderPid,
- message = Message,
- msg_seq_no = MsgSeqNo},
+attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- Confirm = should_confirm_message(Delivery, State),
- case Confirm of
- immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]);
- _ -> ok
- end,
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
- DeliverFun =
- fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- %% we don't need an expiry here because
- %% messages are not being enqueued, so we use
- %% an empty message_properties.
- {AckTag, BQS3} =
- BQ:publish_delivered(
- AckRequired, Message,
- (?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = needs_confirming(Confirm)},
- SenderPid, BQS2),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS3}}
- end,
- {Delivered, State2} =
- deliver_msgs_to_consumers(DeliverFun, false,
- State#q{backing_queue_state = BQS1}),
- {Delivered, Confirm, State2};
+ deliver_msgs_to_consumers(
+ fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
+ Props = message_properties(Confirm, State1),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ AckRequired, Message, Props,
+ SenderPid, BQS2),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS3}}
+ end, false, State#q{backing_queue_state = BQS1});
{Duplicate, BQS1} ->
%% if the message has previously been seen by the BQ then
%% it must have been seen under the same circumstances as
%% now: i.e. if it is now a deliver_immediately then it
%% must have been before.
- Delivered = case Duplicate of
- published -> true;
- discarded -> false
- end,
- {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
+ {case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
- {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
- case Delivered of
- true -> State2;
- false -> Props = (message_properties(State)) #message_properties{
- needs_confirming = needs_confirming(Confirm)},
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid}, State) ->
+ Confirm = should_confirm_message(Delivery, State),
+ case attempt_delivery(Delivery, Confirm, State) of
+ {true, State1} ->
+ maybe_record_confirm_message(Confirm, State1);
+ %% the next two are optimisations
+ {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
+ discard_delivery(Delivery, State1);
+ {false, State1 = #q{ttl = 0, dlx = undefined}} ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ discard_delivery(Delivery, State1);
+ {false, State1} ->
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ Props = message_properties(Confirm, State2),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -700,8 +686,9 @@ discard_delivery(#delivery{sender = SenderPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-message_properties(#q{ttl=TTL}) ->
- #message_properties{expiry = calculate_msg_expiry(TTL)}.
+message_properties(Confirm, #q{ttl = TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(TTL),
+ needs_confirming = needs_confirming(Confirm)}.
calculate_msg_expiry(undefined) -> undefined;
calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
@@ -739,18 +726,15 @@ dead_letter_fun(Reason, _State) ->
dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
case rabbit_exchange:lookup(DLX) of
- {error, not_found} ->
- noreply(State);
- _ ->
- dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State)
+ {error, not_found} -> noreply(State);
+ _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
+ State)
end.
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed_mq = UMQ,
- dlx = DLX,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC,
+ dlx = DLX}) ->
{ok, _, QPids} =
rabbit_basic:publish(
rabbit_basic:delivery(
@@ -760,81 +744,36 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
QPids, State#q.queue_monitors),
publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS),
- cleanup_after_confirm(State1#q{backing_queue_state = BQS1});
- _ -> State2 =
- lists:foldl(
- fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
- UQM1 = rabbit_misc:gb_trees_set_insert(
- QPid, MsgSeqNo, UQM),
- State0#q{unconfirmed_qm = UQM1}
- end, State1, QPids),
- noreply(State2#q{
- unconfirmed_mq =
- gb_trees:insert(
- MsgSeqNo, {gb_sets:from_list(QPids),
- AckTag}, UMQ)})
+ [] -> cleanup_after_confirm([AckTag], State1);
+ _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
+ noreply(State1#q{unconfirmed = UC1})
end.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed_qm = UQM}) ->
+ unconfirmed = UC}) ->
case pmon:is_monitored(QPid, QMons) of
- false -> noreply(State);
- true -> rabbit_log:info("DLQ ~p (for ~s) died~n",
- [QPid, rabbit_misc:rs(qname(State))]),
- State1 = State#q{queue_monitors = pmon:erase(QPid, QMons)},
- case gb_trees:lookup(QPid, UQM) of
- none ->
- noreply(State1);
- {value, MsgSeqNosSet} ->
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> rabbit_log:warning(
- "Dead queue lost ~p messages~n",
- [gb_sets:size(MsgSeqNosSet)]);
- false -> ok
- end,
- handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid,
- State1)
- end
+ false ->
+ noreply(State);
+ true ->
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
+ rabbit_log:warning(
+ "DLQ ~p for ~s died with ~p unconfirmed messages~n",
+ [QPid, rabbit_misc:rs(qname(State)), length(Lost)]);
+ false -> ok
+ end,
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
+ cleanup_after_confirm(
+ [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State#q{queue_monitors = pmon:erase(QPid, QMons),
+ unconfirmed = UC1})
end.
-handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckTags1, UMQ3} =
- lists:foldl(
- fun (MsgSeqNo, {AckTags, UMQ1}) ->
- {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1),
- QPids1 = gb_sets:delete(QPid, QPids),
- case gb_sets:is_empty(QPids1) of
- true -> {[AckTag | AckTags],
- gb_trees:delete(MsgSeqNo, UMQ1)};
- false -> {AckTags, gb_trees:update(
- MsgSeqNo, {QPids1, AckTag}, UMQ1)}
- end
- end, {[], UMQ}, MsgSeqNos),
- {_Guids, BQS1} = BQ:ack(AckTags1, BQS),
- MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
- gb_sets:from_list(MsgSeqNos)),
- State1 = case gb_sets:is_empty(MsgSeqNos1) of
- false -> State#q{
- unconfirmed_qm =
- gb_trees:update(QPid, MsgSeqNos1, UQM)};
- true -> State#q{
- queue_monitors =
- pmon:demonitor(QPid, State#q.queue_monitors),
- unconfirmed_qm =
- gb_trees:delete(QPid, UQM)}
- end,
- cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
- backing_queue_state = BQS1}).
-
stop_later(Reason, State) ->
stop_later(Reason, undefined, noreply, State).
-stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
- case {gb_trees:is_empty(UMQ), Reply} of
+stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) ->
+ case {dtree:is_empty(UC), Reply} of
{true, noreply} ->
{stop, Reason, State};
{true, _} ->
@@ -843,16 +782,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
noreply(State#q{delayed_stop = {Reason, {From, Reply}}})
end.
-cleanup_after_confirm(State = #q{delayed_stop = DS,
- unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) andalso DS =/= undefined of
+cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
+ unconfirmed = UC,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of
{_, {_, noreply}} -> ok;
{_, {From, Reply}} -> gen_server2:reply(From, Reply)
end,
{Reason, _} = DS,
- {stop, Reason, State};
- false -> noreply(State)
+ {stop, Reason, State1};
+ false -> noreply(State1)
end.
already_been_here(_Delivery, #q{dlx = undefined}) ->
@@ -885,29 +828,33 @@ make_dead_letter_msg(DLX, Reason,
exchange_name = Exchange,
routing_keys = RoutingKeys},
State = #q{dlx_routing_key = DlxRoutingKey}) ->
- Headers = rabbit_basic:extract_headers(Content),
- #resource{name = QName} = qname(State),
- %% The first routing key is the one specified in the
- %% basic.publish; all others are CC or BCC keys.
- RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array,
- [{longstr, Key} || Key <- RoutingKeys1]}],
- Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers),
- {DeathRoutingKeys, Headers2} =
+ {DeathRoutingKeys, HeadersFun1} =
case DlxRoutingKey of
- undefined -> {RoutingKeys, Headers1};
+ undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[DlxRoutingKey],
- lists:keydelete(<<"CC">>, 1, Headers1)}
+ fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
+ end,
+ #resource{name = QName} = qname(State),
+ HeadersFun2 =
+ fun (Headers) ->
+ %% The first routing key is the one specified in the
+ %% basic.publish; all others are CC or BCC keys.
+ RoutingKeys1 =
+ [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ Info = [{<<"reason">>,
+ longstr, list_to_binary(atom_to_list(Reason))},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array,
+ [{longstr, Key} || Key <- RoutingKeys1]}],
+ HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
+ Info, Headers))
end,
- Content1 = rabbit_basic:replace_headers(Headers2, Content),
+ Content1 = rabbit_basic:map_headers(HeadersFun2, Content),
Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
routing_keys = DeathRoutingKeys, content = Content1}.
-
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) ->
@@ -1098,7 +1045,8 @@ handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ Confirm = should_confirm_message(Delivery, State),
+ {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
reply(Delivered, case Delivered of
true -> maybe_record_confirm_message(Confirm, State1);
false -> discard_delivery(Delivery, State1)
@@ -1233,8 +1181,15 @@ handle_call(force_event_refresh, _From,
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State) ->
- handle_confirm(MsgSeqNos, QPid, State);
+handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
+ {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ State1 = case dtree:is_defined(QPid, UC1) of
+ false -> QMons = State#q.queue_monitors,
+ State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
+ true -> State
+ end,
+ cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State1#q{unconfirmed = UC1});
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
noreply(State);
@@ -1271,14 +1226,14 @@ handle_cast({ack, AckTags, ChPid}, State) ->
handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
noreply(subtract_acks(
ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case Requeue of
- true -> requeue_and_run(AckTags, State1);
- false -> Fun = dead_letter_fun(rejected, State),
+ case Requeue of
+ true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
+ false -> Fun = dead_letter_fun(rejected, State),
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
BQS1 = BQ:fold(Fun, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
- end
+ end
end));
handle_cast(delete_immediately, State) ->
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 25485ca0..8ad59016 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,7 +20,7 @@
-export([publish/4, publish/6, publish/1,
message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, replace_headers/2, delivery/4, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -63,8 +63,8 @@
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
--spec(replace_headers/2 :: (headers(), rabbit_types:content())
- -> rabbit_types:content()).
+-spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers()))
+ -> rabbit_types:content()).
-spec(header_routes/1 ::
(undefined | rabbit_framing:amqp_table()) -> [string()]).
@@ -193,15 +193,18 @@ extract_headers(Content) ->
rabbit_binary_parser:ensure_content_decoded(Content),
Headers.
-replace_headers(Headers, Content = #content{properties = Props}) ->
+map_headers(F, Content) ->
+ Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
+ #content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
+ Headers1 = F(Headers),
rabbit_binary_generator:clear_encoded_content(
- Content#content{properties = Props#'P_basic'{headers = Headers}}).
+ Content1#content{properties = Props#'P_basic'{headers = Headers1}}).
indexof(L, Element) -> indexof(L, Element, 1).
-indexof([], _Element, _N) -> 0;
-indexof([Element | _Rest], Element, N) -> N;
-indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
+indexof([], _Element, _N) -> 0;
+indexof([Element | _Rest], Element, N) -> N;
+indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 081af766..2245cf65 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,8 +37,8 @@
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
- stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state}).
+ stats_timer, confirm_enabled, publish_seqno, unconfirmed,
+ confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
confirmed = [],
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
@@ -548,45 +547,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State) ->
- {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
- record_confirms(MXs, State1).
-
-process_confirms(MsgSeqNos, QPid, Nack, State) ->
- lists:foldl(
- fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
- none -> Acc
- end
- end, {[], State}, MsgSeqNos).
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
- {MXs, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}},
- Nack) ->
- State1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> UQM1 = gb_trees:delete(QPid, UQM),
- State#ch{unconfirmed_qm = UQM1};
- false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State#ch{unconfirmed_qm = UQM1}
- end;
- none ->
- State
- end,
- Qs1 = gb_sets:del_element(QPid, Qs),
- %% If QPid somehow died initiating a nack, clear the message from
- %% internal data-structures. Also, cleanup empty entries.
- case (Nack orelse gb_sets:is_empty(Qs1)) of
- true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ),
- {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}};
- false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ),
- {MXs, State1#ch{unconfirmed_mq = UMQ1}}
- end.
+confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -1147,22 +1110,13 @@ consumer_monitor(ConsumerTag,
State
end.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
- end,
- %% We remove the MsgSeqNos from UQM before calling
- %% process_confirms to prevent each MsgSeqNo being removed from
- %% the set one by one which which would be inefficient
- State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {Nack, SendFun} =
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> {true, fun send_nacks/2};
- false -> {false, fun record_confirms/2}
- end,
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
- SendFun(MXs, State2).
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {MXs, UC1} = dtree:take_all(QPid, UC),
+ send_nacks(MXs, State#ch{unconfirmed = UC1});
+ false -> {MXs, UC1} = dtree:take(QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1})
+ end.
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
@@ -1389,21 +1343,8 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ} = State,
- UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
- SingletonSet = gb_sets:singleton(MsgSeqNo),
- lists:foldl(
- fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) ->
- case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State0#ch{unconfirmed_qm = UQM1};
- none ->
- UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- State0#ch{unconfirmed_qm = UQM1}
- end
- end, State#ch{unconfirmed_mq = UMQ1}, QPids).
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+ State#ch.unconfirmed)}.
send_nacks([], State) ->
State;
@@ -1441,11 +1382,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UMQ) of
+ CutOff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
+ false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1459,8 +1400,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
State;
-maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) of
+maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
+ case dtree:is_empty(UC) of
false -> State;
true -> complete_tx(State#ch{confirmed = []})
end.
@@ -1488,8 +1429,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
- gb_trees:size(UMQ);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
+ dtree:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6a775adf..51f88c8f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -328,7 +328,7 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
0 -> Arg ++ ".0";
_ -> Arg
end),
- Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]),
+ Inform("Setting memory threshold on ~p to ~p", [Node, Frac]),
rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
new file mode 100644
index 00000000..831d5290
--- /dev/null
+++ b/src/rabbit_disk_monitor.erl
@@ -0,0 +1,196 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_disk_monitor).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0,
+ set_check_interval/1, get_disk_free/0]).
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_DISK_CHECK_INTERVAL, 60000).
+
+-record(state, {dir,
+ limit,
+ timeout,
+ timer,
+ alarmed
+ }).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(disk_free_limit() :: (integer() | {'mem_relative', float()})).
+-spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()).
+-spec(get_disk_free_limit/0 :: () -> integer()).
+-spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok').
+-spec(get_check_interval/0 :: () -> integer()).
+-spec(set_check_interval/1 :: (integer()) -> 'ok').
+-spec(get_disk_free/0 :: () -> (integer() | 'unknown')).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+get_disk_free_limit() ->
+ gen_server:call(?MODULE, get_disk_free_limit, infinity).
+
+set_disk_free_limit(Limit) ->
+ gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity).
+
+get_check_interval() ->
+ gen_server:call(?MODULE, get_check_interval, infinity).
+
+set_check_interval(Interval) ->
+ gen_server:call(?MODULE, {set_check_interval, Interval}, infinity).
+
+get_disk_free() ->
+ gen_server:call(?MODULE, get_disk_free, infinity).
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+
+init([Limit]) ->
+ TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL),
+ Dir = dir(),
+ State = #state { dir = Dir,
+ timeout = ?DEFAULT_DISK_CHECK_INTERVAL,
+ timer = TRef,
+ alarmed = false},
+ case {catch get_disk_free(Dir),
+ vm_memory_monitor:get_total_memory()} of
+ {N1, N2} when is_integer(N1), is_integer(N2) ->
+ {ok, set_disk_limits(State, Limit)};
+ _ ->
+ rabbit_log:info("Disabling disk free space monitoring "
+ "on unsupported platform~n"),
+ {stop, unsupported_platform}
+ end.
+
+handle_call(get_disk_free_limit, _From, State) ->
+ {reply, interpret_limit(State#state.limit), State};
+
+handle_call({set_disk_free_limit, Limit}, _From, State) ->
+ {reply, ok, set_disk_limits(State, Limit)};
+
+handle_call(get_check_interval, _From, State) ->
+ {reply, State#state.timeout, State};
+
+handle_call({set_check_interval, Timeout}, _From, State) ->
+ {ok, cancel} = timer:cancel(State#state.timer),
+ {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+
+handle_call(get_disk_free, _From, State = #state { dir = Dir }) ->
+ {reply, get_disk_free(Dir), State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+%% Server Internals
+%%----------------------------------------------------------------------------
+
+% the partition / drive containing this directory will be monitored
+dir() -> rabbit_mnesia:dir().
+
+set_disk_limits(State, Limit) ->
+ State1 = State#state { limit = Limit },
+ rabbit_log:info("Disk free limit set to ~pMB~n",
+ [trunc(interpret_limit(Limit) / 1048576)]),
+ internal_update(State1).
+
+internal_update(State = #state { limit = Limit,
+ dir = Dir,
+ alarmed = Alarmed}) ->
+ CurrentFreeBytes = get_disk_free(Dir),
+ LimitBytes = interpret_limit(Limit),
+ NewAlarmed = CurrentFreeBytes < LimitBytes,
+ case {Alarmed, NewAlarmed} of
+ {false, true} ->
+ emit_update_info("exceeded", CurrentFreeBytes, LimitBytes),
+ alarm_handler:set_alarm({{resource_limit, disk, node()}, []});
+ {true, false} ->
+ emit_update_info("below limit", CurrentFreeBytes, LimitBytes),
+ alarm_handler:clear_alarm({resource_limit, disk, node()});
+ _ ->
+ ok
+ end,
+ State #state {alarmed = NewAlarmed}.
+
+get_disk_free(Dir) ->
+ get_disk_free(Dir, os:type()).
+
+get_disk_free(Dir, {unix, Sun})
+ when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris ->
+ parse_free_unix(rabbit_misc:os_cmd("/usr/bin/df -k " ++ Dir));
+get_disk_free(Dir, {unix, _}) ->
+ parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
+get_disk_free(Dir, {win32, _}) ->
+ parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir));
+get_disk_free(_, _) ->
+ unknown.
+
+parse_free_unix(CommandResult) ->
+ [_, Stats | _] = string:tokens(CommandResult, "\n"),
+ [_FS, _Total, _Used, Free | _] = string:tokens(Stats, " \t"),
+ list_to_integer(Free) * 1024.
+
+parse_free_win32(CommandResult) ->
+ LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
+ [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "),
+ list_to_integer(Free).
+
+interpret_limit({mem_relative, R}) ->
+ round(R * vm_memory_monitor:get_total_memory());
+interpret_limit(L) ->
+ L.
+
+emit_update_info(State, CurrentFree, Limit) ->
+ rabbit_log:info(
+ "Disk free space limit now ~s. Free bytes:~p Limit:~p~n",
+ [State, CurrentFree, Limit]).
+
+start_timer(Timeout) ->
+ {ok, TRef} = timer:send_interval(Timeout, update),
+ TRef.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 83e28c44..910a89b4 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -242,6 +242,11 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
+%% Optimisation
+route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
+ #delivery{message = #basic_message{routing_keys = RKs}}) ->
+ [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
+
route(X = #exchange{name = XName}, Delivery) ->
route1(Delivery, {queue:from_list([X]), XName, []}).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index bfdab487..5db0fa2f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -177,11 +177,12 @@ dropwhile(Pred, MsgFun,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
- Len = BQ:len(BQS),
+ Len = BQ:len(BQS),
BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
- Dropped = Len - BQ:len(BQS1),
+ Len1 = BQ:len(BQS1),
+ ok = gm:broadcast(GM, {set_length, Len1}),
+ Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 }.
@@ -241,11 +242,11 @@ ack(AckTags, State = #state { gm = GM,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
+ AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f0dff354..404cca2c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -838,7 +838,7 @@ process_instruction({ack, MsgIds},
process_instruction({fold, MsgFun, AckTags},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- BQS1 = BQ:fold(AckTags, MsgFun, BQS),
+ BQS1 = BQ:fold(MsgFun, BQS, AckTags),
{ok, State #state { backing_queue_state = BQS1 }};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ddf7f574..c1be7613 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,8 +46,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([dict_cons/3, orddict_cons/3, gb_trees_cons/3,
- gb_trees_set_insert/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
@@ -60,6 +59,7 @@
-export([append_rpc_all_nodes/4]).
-export([multi_call/2]).
-export([quit/1]).
+-export([os_cmd/1]).
%%----------------------------------------------------------------------------
@@ -176,7 +176,6 @@
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
--spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()).
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
-spec(gb_trees_foreach/2 ::
(fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
@@ -204,6 +203,7 @@
-spec(multi_call/2 ::
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(quit/1 :: (integer() | string()) -> no_return()).
+-spec(os_cmd/1 :: (string()) -> string()).
-endif.
@@ -717,15 +717,6 @@ gb_trees_cons(Key, Value, Tree) ->
none -> gb_trees:insert(Key, [Value], Tree)
end.
-gb_trees_set_insert(Key, Value, Tree) ->
- case gb_trees:lookup(Key, Tree) of
- {value, Values} ->
- Values1 = gb_sets:insert(Value, Values),
- gb_trees:update(Key, Values1, Tree);
- none ->
- gb_trees:insert(Key, gb_sets:singleton(Value), Tree)
- end.
-
gb_trees_fold(Fun, Acc, Tree) ->
gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))).
@@ -914,3 +905,10 @@ quit(Status) ->
{unix, _} -> halt(Status);
{win32, _} -> init:stop(Status)
end.
+
+os_cmd(Command) ->
+ Exec = hd(string:tokens(Command, " ")),
+ case os:find_executable(Exec) of
+ false -> throw({command_not_found, Exec});
+ _ -> os:cmd(Command)
+ end.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 329c07dc..9a972d9e 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -39,15 +39,15 @@
names(Hostname) ->
Self = self(),
- process_flag(trap_exit, true),
- Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
+ Ref = make_ref(),
+ {Pid, MRef} = spawn_monitor(
+ fun () -> Self ! {Ref, net_adm:names(Hostname)} end),
timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
- Res = receive
- {names, Names} -> Names;
- {'EXIT', Pid, Reason} -> {error, Reason}
- end,
- process_flag(trap_exit, false),
- Res.
+ receive
+ {Ref, Names} -> erlang:demonitor(MRef, [flush]),
+ Names;
+ {'DOWN', MRef, process, Pid, Reason} -> {error, Reason}
+ end.
diagnostics(Nodes) ->
Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 47e796dc..5e9e78d3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -25,7 +25,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/2, server_properties/1]).
+-export([conserve_resources/2, server_properties/1]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -38,7 +38,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_memory,
+ auth_mechanism, auth_state, conserve_resources,
last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
@@ -71,7 +71,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
@@ -133,8 +133,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
+conserve_resources(Pid, Conserve) ->
+ Pid ! {conserve_resources, Conserve},
ok.
server_properties(Protocol) ->
@@ -218,7 +218,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf_len = 0,
auth_mechanism = none,
auth_state = none,
- conserve_memory = false,
+ conserve_resources = false,
last_blocked_by = none,
last_blocked_at = never},
try
@@ -276,8 +276,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve}));
+handle_other({conserve_resources, Conserve}, Deb, State) ->
+ recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -358,8 +358,8 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_memory = Mem}) ->
+control_throttle(State = #v1{connection_state = CS,
+ conserve_resources = Mem}) ->
case {CS, Mem orelse credit_flow:blocked()} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
@@ -377,9 +377,9 @@ maybe_block(State = #v1{connection_state = blocking}) ->
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_memory = true}) ->
- State#v1{last_blocked_by = mem};
-update_last_blocked_by(State = #v1{conserve_memory = false}) ->
+update_last_blocked_by(State = #v1{conserve_resources = true}) ->
+ State#v1{last_blocked_by = resource};
+update_last_blocked_by(State = #v1{conserve_resources = false}) ->
State#v1{last_blocked_by = flow}.
close_connection(State = #v1{queue_collector = Collector,
@@ -701,11 +701,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- conserve_memory = Conserve}),
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ conserve_resources = Conserve}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 0965e3b3..bf2b4798 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -52,22 +52,20 @@ start_child(Mod, Args) ->
start_child(Mod, Mod, Args).
start_child(ChildId, Mod, Args) ->
- {ok, _} = supervisor:start_child(?SERVER,
- {ChildId, {Mod, start_link, Args},
- transient, ?MAX_WAIT, worker, [Mod]}),
- ok.
+ child_reply(supervisor:start_child(?SERVER,
+ {ChildId, {Mod, start_link, Args},
+ transient, ?MAX_WAIT, worker, [Mod]})).
start_restartable_child(Mod) ->
start_restartable_child(Mod, []).
start_restartable_child(Mod, Args) ->
Name = list_to_atom(atom_to_list(Mod) ++ "_sup"),
- {ok, _} = supervisor:start_child(
- ?SERVER,
- {Name, {rabbit_restartable_sup, start_link,
- [Name, {Mod, start_link, Args}]},
- transient, infinity, supervisor, [rabbit_restartable_sup]}),
- ok.
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {Name, {rabbit_restartable_sup, start_link,
+ [Name, {Mod, start_link, Args}]},
+ transient, infinity, supervisor, [rabbit_restartable_sup]})).
stop_child(ChildId) ->
case supervisor:terminate_child(?SERVER, ChildId) of
@@ -77,3 +75,9 @@ stop_child(ChildId) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
+
+
+%%----------------------------------------------------------------------------
+
+child_reply({ok, _}) -> ok;
+child_reply(X) -> X.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 85fe5426..e356d7ff 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -883,6 +883,8 @@ test_cluster_management() ->
"invalid2@invalid"]),
ok = assert_ram_node(),
+ ok = control_action(reset, []),
+
SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = test_cluster_management2(SecondaryNode);
@@ -898,7 +900,6 @@ test_cluster_management2(SecondaryNode) ->
SecondaryNodeS = atom_to_list(SecondaryNode),
%% make a disk node
- ok = control_action(reset, []),
ok = control_action(cluster, [NodeS]),
ok = assert_disc_node(),
%% make a ram node
@@ -1244,6 +1245,9 @@ test_confirms() ->
},
rabbit_basic:build_content(
#'P_basic'{delivery_mode = 2}, <<"">>)),
+ %% We must not kill the queue before the channel has processed the
+ %% 'publish'.
+ ok = rabbit_channel:flush(Ch),
%% Crash the queue
QPid1 ! boom,
%% Wait for a nack
@@ -2551,7 +2555,7 @@ test_queue_recover() ->
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName)
+ rabbit_amqqueue:internal_delete(QName, QPid1)
end),
passed.
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index dc74b2f5..f3a8cacf 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -24,7 +24,7 @@
send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--record(wstate, {sock, channel, frame_max, protocol}).
+-record(wstate, {sock, channel, frame_max, protocol, pending}).
-define(HIBERNATE_AFTER, 5000).
@@ -80,7 +80,8 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
- protocol = Protocol}])}.
+ protocol = Protocol,
+ pending = []}])}.
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
{ok,
@@ -88,7 +89,8 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
- protocol = Protocol}])}.
+ protocol = Protocol,
+ pending = []}])}.
mainloop(ReaderPid, State) ->
try
@@ -98,37 +100,41 @@ mainloop(ReaderPid, State) ->
end,
done.
-mainloop1(ReaderPid, State) ->
+mainloop1(ReaderPid, State = #wstate{pending = []}) ->
receive
Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
after ?HIBERNATE_AFTER ->
erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
+ end;
+mainloop1(ReaderPid, State) ->
+ receive
+ Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ after 0 ->
+ ?MODULE:mainloop1(ReaderPid, flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
- ok = internal_send_command_async(MethodRecord, State),
- State;
+ internal_send_command_async(MethodRecord, State);
handle_message({send_command, MethodRecord, Content}, State) ->
- ok = internal_send_command_async(MethodRecord, Content, State),
- State;
+ internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
- ok = internal_send_command_async(MethodRecord, State),
+ State1 = flush(internal_send_command_async(MethodRecord, State)),
gen_server:reply(From, ok),
- State;
+ State1;
handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
State) ->
- ok = internal_send_command_async(MethodRecord, Content, State),
+ State1 = flush(internal_send_command_async(MethodRecord, Content, State)),
gen_server:reply(From, ok),
- State;
+ State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
- ok = internal_send_command_async(MethodRecord, State),
+ State1 = internal_send_command_async(MethodRecord, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
- State;
+ State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State) ->
- ok = internal_send_command_async(MethodRecord, Content, State),
+ State1 = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
- State;
+ State1;
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
State;
@@ -184,22 +190,6 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
-%% We optimise delivery of small messages. Content-bearing methods
-%% require at least three frames. Small messages always fit into
-%% that. We hand their frames to the Erlang network functions in one
-%% go, which may lead to somewhat more efficient processing in the
-%% runtime and a greater chance of coalescing into fewer TCP packets.
-%%
-%% By contrast, for larger messages, split across many frames, we want
-%% to allow interleaving of frames on different channels. Hence we
-%% hand them to the Erlang network functions one frame at a time.
-send_frames(Fun, Sock, Frames) when length(Frames) =< 3 ->
- Fun(Sock, Frames);
-send_frames(Fun, Sock, Frames) ->
- lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame);
- (_Frame, Other) -> Other
- end, ok, Frames).
-
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
@@ -209,9 +199,44 @@ internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
Protocol) ->
- ok = send_frames(fun tcp_send/2, Sock,
- assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)).
+ ok = lists:foldl(fun (Frame, ok) -> tcp_send(Sock, Frame);
+ (_Frame, Other) -> Other
+ end, ok, assemble_frames(Channel, MethodRecord,
+ Content, FrameMax, Protocol)).
+
+internal_send_command_async(MethodRecord,
+ State = #wstate{channel = Channel,
+ protocol = Protocol,
+ pending = Pending}) ->
+ Frame = assemble_frame(Channel, MethodRecord, Protocol),
+ maybe_flush(State#wstate{pending = [Frame | Pending]}).
+
+internal_send_command_async(MethodRecord, Content,
+ State = #wstate{channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol,
+ pending = Pending}) ->
+ Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax,
+ Protocol),
+ maybe_flush(State#wstate{pending = [Frames | Pending]}).
+
+%% This magic number is the tcp-over-ethernet MSS (1460) minus the
+%% minimum size of a AMQP basic.deliver method frame (24) plus basic
+%% content header (22). The idea is that we want to flush just before
+%% exceeding the MSS.
+-define(FLUSH_THRESHOLD, 1414).
+
+maybe_flush(State = #wstate{pending = Pending}) ->
+ case iolist_size(Pending) >= ?FLUSH_THRESHOLD of
+ true -> flush(State);
+ false -> State
+ end.
+
+flush(State = #wstate{pending = []}) ->
+ State;
+flush(State = #wstate{sock = Sock, pending = Pending}) ->
+ ok = port_cmd(Sock, lists:reverse(Pending)),
+ State#wstate{pending = []}.
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -231,21 +256,6 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(MethodRecord,
- #wstate{sock = Sock,
- channel = Channel,
- protocol = Protocol}) ->
- ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
-
-internal_send_command_async(MethodRecord, Content,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = send_frames(fun port_cmd/2, Sock,
- assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)).
-
port_cmd(Sock, Data) ->
true = try rabbit_net:port_command(Sock, Data)
catch error:Error -> exit({writer, send_failed, Error})
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index fca55f02..fb184d1a 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -181,10 +181,10 @@ internal_update(State = #state { memory_limit = MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info(set, MemUsed, MemLimit),
- alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
+ alarm_handler:set_alarm({{resource_limit, memory, node()}, []});
{true, false} ->
emit_update_info(clear, MemUsed, MemLimit),
- alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
+ alarm_handler:clear_alarm({resource_limit, memory, node()});
_ ->
ok
end,