diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-11 22:39:35 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-11 22:39:35 +0100 |
commit | 93b9c9f5c282418e66091200490090a82cdfe249 (patch) | |
tree | 946edb229c90fab30808fc4f00295d38e89eaf02 | |
parent | acb9abe971cef1f6dec482d8e351c1b62de24e08 (diff) | |
parent | 797c633e2a56ffed0ae338979471a97bc7f26bbb (diff) | |
download | rabbitmq-server-93b9c9f5c282418e66091200490090a82cdfe249.tar.gz |
merge bug24769
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 6 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 7 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/postrm.in | 9 | ||||
-rw-r--r-- | src/dtree.erl | 160 | ||||
-rw-r--r-- | src/rabbit.erl | 6 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 89 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 125 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 309 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 19 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 103 | ||||
-rw-r--r-- | src/rabbit_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_disk_monitor.erl | 196 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 22 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 16 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 34 | ||||
-rw-r--r-- | src/rabbit_sup.erl | 24 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 110 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 4 |
25 files changed, 803 insertions, 473 deletions
@@ -216,12 +216,12 @@ start-rabbit-on-node: all stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) -set-memory-alarm: all - echo "alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []})." | \ +set-resource-alarm: all + echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \ $(ERL_CALL) -clear-memory-alarm: all - echo "alarm_handler:clear_alarm({vm_memory_high_watermark, node()})." | \ +clear-resource-alarm: all + echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \ $(ERL_CALL) stop-node: diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ee07ebfe..ded3ab48 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -948,7 +948,7 @@ <term>source_kind</term> <listitem><para>The kind of the source of messages to which the binding is attached. Currently always - queue. With non-ASCII characters escaped as in + exchange. With non-ASCII characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> @@ -1074,8 +1074,8 @@ <varlistentry> <term>last_blocked_by</term> <listitem><para>The reason for which this connection - was last blocked. One of 'mem' - due to a memory - alarm, 'flow' - due to internal flow control, or + was last blocked. One of 'resource' - due to a memory + or disk alarm, 'flow' - due to internal flow control, or 'none' if the connection was never blocked.</para></listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index fd19051d..b7d14f20 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -19,6 +19,7 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, + {disk_free_limit, {mem_relative, 1.0}}, {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, {frame_max, 131072}, diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index bf93baba..79d44e1b 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,7 +25,8 @@ -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). --type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok')). +-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | + 'undefined'). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -45,8 +46,8 @@ -> {undefined, state()}). -spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). -spec(dropwhile/3 :: - (fun ((rabbit_types:message_properties()) -> boolean()), - msg_fun() | 'undefined', state()) + (fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), + state()) -> state()). -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index fb02cd6a..e935acf5 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,6 +2,8 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: RabbitMQ Team <packaging@rabbitmq.com> +Uploader: Emile Joubert <emile@rabbitmq.com> +DM-Upload-Allowed: yes Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip Standards-Version: 3.8.0 diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in index baf081fc..c2e9bbfe 100644 --- a/packaging/debs/Debian/debian/postrm.in +++ b/packaging/debs/Debian/debian/postrm.in @@ -35,20 +35,15 @@ case "$1" in if [ -d /etc/rabbitmq ]; then rm -r /etc/rabbitmq fi - remove_plugin_traces + remove_plugin_traces if getent passwd rabbitmq >/dev/null; then # Stop epmd if run by the rabbitmq user pkill -u rabbitmq epmd || : - - deluser rabbitmq - fi - if getent group rabbitmq >/dev/null; then - delgroup rabbitmq fi ;; remove|upgrade) - remove_plugin_traces + remove_plugin_traces ;; failed-upgrade|abort-install|abort-upgrade|disappear) diff --git a/src/dtree.erl b/src/dtree.erl new file mode 100644 index 00000000..67bbbc1b --- /dev/null +++ b/src/dtree.erl @@ -0,0 +1,160 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +%% A dual-index tree. +%% +%% Entries have the following shape: +%% +%% +----+--------------------+---+ +%% | PK | SK1, SK2, ..., SKN | V | +%% +----+--------------------+---+ +%% +%% i.e. a primary key, set of secondary keys, and a value. +%% +%% There can be only one entry per primary key, but secondary keys may +%% appear in multiple entries. +%% +%% The set of secondary keys must be non-empty. Or, to put it another +%% way, entries only exist while their secondary key set is non-empty. + +-module(dtree). + +-export([empty/0, insert/4, take/3, take/2, take_all/2, + is_defined/2, is_empty/1, smallest/1, size/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: {gb_tree(), gb_tree()}). + +-type(pk() :: any()). +-type(sk() :: any()). +-type(val() :: any()). +-type(kv() :: {pk(), val()}). + +-spec(empty/0 :: () -> ?MODULE()). +-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()). +-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). +-spec(smallest/1 :: (?MODULE()) -> kv()). +-spec(size/1 :: (?MODULE()) -> non_neg_integer()). + +-endif. + +%%---------------------------------------------------------------------------- + +empty() -> {gb_trees:empty(), gb_trees:empty()}. + +%% Insert an entry. Fails if there already is an entry with the given +%% primary key. +insert(PK, [], V, {P, S}) -> + %% dummy insert to force error if PK exists + gb_trees:insert(PK, {gb_sets:empty(), V}, P), + {P, S}; +insert(PK, SKs, V, {P, S}) -> + {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P), + lists:foldl(fun (SK, S0) -> + case gb_trees:lookup(SK, S0) of + {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS), + gb_trees:update(SK, PKS1, S0); + none -> PKS = gb_sets:singleton(PK), + gb_trees:insert(SK, PKS, S0) + end + end, S, SKs)}. + +%% Remove the given secondary key from the entries of the given +%% primary keys, returning the primary-key/value pairs of any entries +%% that were dropped as the result (i.e. due to their secondary key +%% set becoming empty). It is ok for the given primary keys and/or +%% secondary key to not exist. +take(PKs, SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> TakenPKS = gb_sets:from_list(PKs), + PKSInter = gb_sets:intersection(PKS, TakenPKS), + PKSDiff = gb_sets:difference (PKS, TakenPKS), + {KVs, P1} = take2(PKSInter, SK, P), + {KVs, {P1, case gb_sets:is_empty(PKSDiff) of + true -> gb_trees:delete(SK, S); + false -> gb_trees:update(SK, PKSDiff, S) + end}} + end. + +%% Remove the given secondary key from all entries, returning the +%% primary-key/value pairs of any entries that were dropped as the +%% result (i.e. due to their secondary key set becoming empty). It is +%% ok for the given secondary key to not exist. +take(SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> {KVs, P1} = take2(PKS, SK, P), + {KVs, {P1, gb_trees:delete(SK, S)}} + end. + +%% Drop all entries which contain the given secondary key, returning +%% the primary-key/value pairs of these entries. It is ok for the +%% given secondary key to not exist. +take_all(SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P), + {KVs, {P1, prune(SKS, PKS, S)}} + end. + +is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). + +is_empty({P, _S}) -> gb_trees:is_empty(P). + +smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P), + {K, V}. + +size({P, _S}) -> gb_trees:size(P). + +%%---------------------------------------------------------------------------- + +take2(PKS, SK, P) -> + gb_sets:fold(fun (PK, {KVs, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + SKS1 = gb_sets:delete(SK, SKS), + case gb_sets:is_empty(SKS1) of + true -> KVs1 = [{PK, V} | KVs], + {KVs1, gb_trees:delete(PK, P0)}; + false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} + end + end, {[], P}, PKS). + +take_all2(PKS, P) -> + gb_sets:fold(fun (PK, {KVs, SKS0, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + {[{PK, V} | KVs], gb_sets:union(SKS, SKS0), + gb_trees:delete(PK, P0)} + end, {[], gb_sets:empty(), P}, PKS). + +prune(SKS, PKS, S) -> + gb_sets:fold(fun (SK0, S0) -> + PKS1 = gb_trees:get(SK0, S0), + PKS2 = gb_sets:difference(PKS1, PKS), + case gb_sets:is_empty(PKS2) of + true -> gb_trees:delete(SK0, S0); + false -> gb_trees:update(SK0, PKS2, S0) + end + end, S, SKS). diff --git a/src/rabbit.erl b/src/rabbit.erl index dd5fb89c..eec7e34e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -327,7 +327,11 @@ status() -> [{vm_memory_high_watermark, {vm_memory_monitor, get_vm_memory_high_watermark, []}}, {vm_memory_limit, {vm_memory_monitor, - get_memory_limit, []}}]), + get_memory_limit, []}}, + {disk_free_limit, {rabbit_disk_monitor, + get_disk_free_limit, []}}, + {disk_free, {rabbit_disk_monitor, + get_disk_free, []}}]), S3 = rabbit_misc:with_exit_handler( fun () -> [] end, fun () -> [{file_descriptors, file_handle_cache:info()}] end), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 187ec1ab..04e0c141 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). --export([remote_conserve_memory/2]). %% Internal use only +-export([remote_conserve_resources/3]). %% Internal use only -record(alarms, {alertees, alarmed_nodes}). @@ -45,6 +45,9 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]), + + {ok, DiskLimit} = application:get_env(disk_free_limit), + rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]), ok. stop() -> @@ -61,41 +64,41 @@ on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}). %% Can't use alarm_handler:{set,clear}_alarm because that doesn't %% permit notifying a remote node. -remote_conserve_memory(Pid, true) -> +remote_conserve_resources(Pid, Source, true) -> gen_event:notify({alarm_handler, node(Pid)}, - {set_alarm, {{vm_memory_high_watermark, node()}, []}}); -remote_conserve_memory(Pid, false) -> + {set_alarm, {{resource_limit, Source, node()}, []}}); +remote_conserve_resources(Pid, Source, false) -> gen_event:notify({alarm_handler, node(Pid)}, - {clear_alarm, {vm_memory_high_watermark, node()}}). + {clear_alarm, {resource_limit, Source, node()}}). %%---------------------------------------------------------------------------- init([]) -> {ok, #alarms{alertees = dict:new(), - alarmed_nodes = sets:new()}}. + alarmed_nodes = dict:new()}}. handle_call({register, Pid, HighMemMFA}, State) -> - {ok, 0 < sets:size(State#alarms.alarmed_nodes), + {ok, 0 < dict:size(State#alarms.alarmed_nodes), internal_register(Pid, HighMemMFA, State)}; handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) -> - {ok, maybe_alert(fun sets:add_element/2, Node, State)}; +handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) -> + {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; -handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) -> - {ok, maybe_alert(fun sets:del_element/2, Node, State)}; +handle_event({clear_alarm, {resource_limit, Source, Node}}, State) -> + {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; handle_event({node_up, Node}, State) -> %% Must do this via notify and not call to avoid possible deadlock. ok = gen_event:notify( {alarm_handler, Node}, - {register, self(), {?MODULE, remote_conserve_memory, []}}), + {register, self(), {?MODULE, remote_conserve_resources, []}}), {ok, State}; handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun sets:del_element/2, Node, State)}; + {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)}; handle_event({register, Pid, HighMemMFA}, State) -> {ok, internal_register(Pid, HighMemMFA, State)}; @@ -118,34 +121,58 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN, - alertees = Alertees}) -> - AN1 = SetFun(Node, AN), - BeforeSz = sets:size(AN), - AfterSz = sets:size(AN1), +dict_unappend_all(Key, _Val, Dict) -> + dict:erase(Key, Dict). + +dict_unappend(Key, Val, Dict) -> + case lists:delete(Val, dict:fetch(Key, Dict)) of + [] -> dict:erase(Key, Dict); + X -> dict:store(Key, X, Dict) + end. + +count_dict_values(Val, Dict) -> + dict:fold(fun (_Node, List, Count) -> + Count + case lists:member(Val, List) of + true -> 1; + false -> 0 + end + end, 0, Dict). + +maybe_alert(UpdateFun, Node, Source, + State = #alarms{alarmed_nodes = AN, + alertees = Alertees}) -> + AN1 = UpdateFun(Node, Source, AN), + BeforeSz = count_dict_values(Source, AN), + AfterSz = count_dict_values(Source, AN1), + %% If we have changed our alarm state, inform the remotes. IsLocal = Node =:= node(), - if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees); - IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees); - true -> ok + if IsLocal andalso BeforeSz < AfterSz -> + ok = alert_remote(true, Alertees, Source); + IsLocal andalso BeforeSz > AfterSz -> + ok = alert_remote(false, Alertees, Source); + true -> + ok end, %% If the overall alarm state has changed, inform the locals. - case {BeforeSz, AfterSz} of - {0, 1} -> ok = alert_local(true, Alertees); - {1, 0} -> ok = alert_local(false, Alertees); + case {dict:size(AN), dict:size(AN1)} of + {0, 1} -> ok = alert_local(true, Alertees, Source); + {1, 0} -> ok = alert_local(false, Alertees, Source); {_, _} -> ok end, State#alarms{alarmed_nodes = AN1}. -alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2). +alert_local(Alert, Alertees, _Source) -> + alert(Alertees, [Alert], fun erlang:'=:='/2). -alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2). +alert_remote(Alert, Alertees, Source) -> + alert(Alertees, [Source, Alert], fun erlang:'=/='/2). -alert(Alert, Alertees, NodeComparator) -> +alert(Alertees, AlertArg, NodeComparator) -> Node = node(), dict:fold(fun (Pid, {M, F, A}, ok) -> case NodeComparator(Node, node(Pid)) of - true -> apply(M, F, A ++ [Pid, Alert]); + true -> apply(M, F, A ++ [Pid] ++ AlertArg); false -> ok end end, ok, Alertees). @@ -153,9 +180,9 @@ alert(Alert, Alertees, NodeComparator) -> internal_register(Pid, {M, F, A} = HighMemMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), - case sets:is_element(node(), State#alarms.alarmed_nodes) of - true -> ok = apply(M, F, A ++ [Pid, true]); - false -> ok + case dict:find(node(), State#alarms.alarmed_nodes) of + {ok, _Sources} -> apply(M, F, A ++ [Pid, true]); + error -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertees), State#alarms{alertees = NewAlertees}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7c20f67d..c1673504 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,7 +32,7 @@ %% internal --export([internal_declare/2, internal_delete/1, run_backing_queue/3, +-export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -144,11 +144,11 @@ -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). --spec(internal_delete/1 :: - (name()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit() | - fun (() -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). +-spec(internal_delete/2 :: + (name(), pid()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -231,7 +231,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName), + false -> TailFun = internal_delete(QueueName, QPid), fun () -> TailFun(), ExistingQ end end end @@ -330,54 +330,60 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). check_declare_arguments(QueueName, Args) -> - [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of - ok -> ok; - {error, Error} -> rabbit_misc:protocol_error( - precondition_failed, - "invalid arg '~s' for ~s: ~255p", - [Key, rabbit_misc:rs(QueueName), Error]) - end || - {Key, Fun} <- - [{<<"x-expires">>, fun check_integer_argument/2}, - {<<"x-message-ttl">>, fun check_integer_argument/2}, - {<<"x-ha-policy">>, fun check_ha_policy_argument/2}, - {<<"x-dead-letter-exchange">>, fun check_string_argument/2}, - {<<"x-dead-letter-routing-key">>, - fun check_dlxrk_argument/2}]], + Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, + {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, + {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, + {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], + [case rabbit_misc:table_lookup(Args, Key) of + undefined -> ok; + TypeVal -> case Fun(TypeVal, Args) of + ok -> ok; + {error, Error} -> rabbit_misc:protocol_error( + precondition_failed, + "invalid arg '~s' for ~s: ~255p", + [Key, rabbit_misc:rs(QueueName), + Error]) + end + end || {Key, Fun} <- Checks], ok. -check_string_argument(undefined, _Args) -> - ok; -check_string_argument({longstr, _}, _Args) -> +check_string_arg({longstr, _}, _Args) -> ok; -check_string_argument({Type, _}, _) -> +check_string_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. -check_integer_argument(undefined, _Args) -> - ok; -check_integer_argument({Type, Val}, _Args) when Val > 0 -> +check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; false -> {error, {unacceptable_type, Type}} - end; -check_integer_argument({_Type, Val}, _Args) -> - {error, {value_zero_or_less, Val}}. + end. -check_dlxrk_argument(undefined, _Args) -> - ok; -check_dlxrk_argument({longstr, _}, Args) -> +check_positive_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val > 0 -> ok; + ok -> {error, {value_zero_or_less, Val}}; + Error -> Error + end. + +check_non_neg_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_less_than_zero, Val}}; + Error -> Error + end. + +check_dlxrk_arg({longstr, _}, Args) -> case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of undefined -> {error, routing_key_but_no_dlx_defined}; _ -> ok end; -check_dlxrk_argument({Type, _}, _Args) -> +check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -check_ha_policy_argument(undefined, _Args) -> - ok; -check_ha_policy_argument({longstr, <<"all">>}, _Args) -> +check_ha_policy_arg({longstr, <<"all">>}, _Args) -> ok; -check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> +check_ha_policy_arg({longstr, <<"nodes">>}, Args) -> case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of undefined -> {error, {require, 'x-ha-policy-params'}}; @@ -393,9 +399,9 @@ check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> {Type, _} -> {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} end; -check_ha_policy_argument({longstr, Policy}, _Args) -> +check_ha_policy_arg({longstr, Policy}, _Args) -> {error, {invalid_ha_policy, Policy}}; -check_ha_policy_argument({Type, _}, _Args) -> +check_ha_policy_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. list() -> @@ -535,13 +541,19 @@ internal_delete1(QueueName) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName). -internal_delete(QueueName) -> +internal_delete(QueueName, QPid) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - rabbit_binding:process_deletions(Deletions) + T = rabbit_binding:process_deletions(Deletions), + fun() -> + ok = T(), + ok = rabbit_event:notify(queue_deleted, + [{pid, QPid}, + {name, QueueName}]) + end end end). @@ -556,14 +568,25 @@ set_maximum_since_use(QPid, Age) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid, - slave_pids = []} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])), - rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), Dels)) + fun () -> QsDels = + qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || + #amqqueue{name = QName, pid = Pid, + slave_pids = []} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])), + {Qs, Dels} = lists:unzip(QsDels), + T = rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels)), + fun () -> + T(), + lists:foreach( + fun({QName, QPid}) -> + ok = rabbit_event:notify(queue_deleted, + [{pid, QPid}, + {name, QName}]) + end, Qs) + end end). delete_queue(QueueName) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 61794fd0..f03f40a2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -24,9 +24,6 @@ -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(BASE_MESSAGE_PROPERTIES, - #message_properties{expiry = undefined, needs_confirming = false}). - -export([start_link/1, info_keys/0]). -export([init_with_backing_queue_state/8]). @@ -52,8 +49,7 @@ ttl_timer_ref, senders, publish_seqno, - unconfirmed_mq, - unconfirmed_qm, + unconfirmed, delayed_stop, queue_monitors, dlx, @@ -140,8 +136,7 @@ init(Q) -> dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty()}, @@ -167,8 +162,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, ttl = undefined, senders = Senders, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = pmon:new(), msg_id_to_channel = MTC}, @@ -186,16 +180,13 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate(Reason, State = #q{q = #amqqueue{name = QName}, backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? - terminate_shutdown(fun (BQS) -> - rabbit_event:notify( - queue_deleted, [{pid, self()}, - {name, QName}]), - BQS1 = BQ:delete_and_terminate(Reason, BQS), - %% don't care if the internal delete - %% doesn't return 'ok'. - rabbit_amqqueue:internal_delete(qname(State)), - BQS1 - end, State). + terminate_shutdown( + fun (BQS) -> + BQS1 = BQ:delete_and_terminate(Reason, BQS), + %% don't care if the internal delete doesn't return 'ok'. + rabbit_amqqueue:internal_delete(QName, self()), + BQS1 + end, State). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -336,12 +327,10 @@ ensure_expiry_timer(State = #q{expires = undefined}) -> State; ensure_expiry_timer(State = #q{expires = Expires}) -> case is_unused(State) of - true -> - NewState = stop_expiry_timer(State), - TRef = erlang:send_after(Expires, self(), maybe_expire), - NewState#q{expiry_timer_ref = TRef}; - false -> - State + true -> NewState = stop_expiry_timer(State), + TRef = erlang:send_after(Expires, self(), maybe_expire), + NewState#q{expiry_timer_ref = TRef}; + false -> State end. ensure_stats_timer(State) -> @@ -501,8 +490,10 @@ should_confirm_message(#delivery{sender = SenderPid, id = MsgId}}, #q{q = #amqqueue{durable = true}}) -> {eventually, SenderPid, MsgSeqNo, MsgId}; -should_confirm_message(_Delivery, _State) -> - immediately. +should_confirm_message(#delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo}, + _State) -> + {immediately, SenderPid, MsgSeqNo}. needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. @@ -511,6 +502,9 @@ maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> State#q{msg_id_to_channel = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; +maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + State; maybe_record_confirm_message(_Confirm, State) -> State. @@ -522,58 +516,50 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(Delivery = #delivery{sender = SenderPid, - message = Message, - msg_seq_no = MsgSeqNo}, +attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - Confirm = should_confirm_message(Delivery, State), - case Confirm of - immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); - _ -> ok - end, case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> - DeliverFun = - fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - %% we don't need an expiry here because - %% messages are not being enqueued, so we use - %% an empty message_properties. - {AckTag, BQS3} = - BQ:publish_delivered( - AckRequired, Message, - (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = needs_confirming(Confirm)}, - SenderPid, BQS2), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS3}} - end, - {Delivered, State2} = - deliver_msgs_to_consumers(DeliverFun, false, - State#q{backing_queue_state = BQS1}), - {Delivered, Confirm, State2}; + deliver_msgs_to_consumers( + fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> + Props = message_properties(Confirm, State1), + {AckTag, BQS3} = BQ:publish_delivered( + AckRequired, Message, Props, + SenderPid, BQS2), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS3}} + end, false, State#q{backing_queue_state = BQS1}); {Duplicate, BQS1} -> %% if the message has previously been seen by the BQ then %% it must have been seen under the same circumstances as %% now: i.e. if it is now a deliver_immediately then it %% must have been before. - Delivered = case Duplicate of - published -> true; - discarded -> false - end, - {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + {case Duplicate of + published -> true; + discarded -> false + end, + State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, State) -> - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - maybe_record_confirm_message(Confirm, State1), - case Delivered of - true -> State2; - false -> Props = (message_properties(State)) #message_properties{ - needs_confirming = needs_confirming(Confirm)}, - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) +deliver_or_enqueue(Delivery = #delivery{message = Message, + msg_seq_no = MsgSeqNo, + sender = SenderPid}, State) -> + Confirm = should_confirm_message(Delivery, State), + case attempt_delivery(Delivery, Confirm, State) of + {true, State1} -> + maybe_record_confirm_message(Confirm, State1); + %% the next two are optimisations + {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never -> + discard_delivery(Delivery, State1); + {false, State1 = #q{ttl = 0, dlx = undefined}} -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + discard_delivery(Delivery, State1); + {false, State1} -> + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_record_confirm_message(Confirm, State1), + Props = message_properties(Confirm, State2), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -700,8 +686,9 @@ discard_delivery(#delivery{sender = SenderPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. -message_properties(#q{ttl=TTL}) -> - #message_properties{expiry = calculate_msg_expiry(TTL)}. +message_properties(Confirm, #q{ttl = TTL}) -> + #message_properties{expiry = calculate_msg_expiry(TTL), + needs_confirming = needs_confirming(Confirm)}. calculate_msg_expiry(undefined) -> undefined; calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). @@ -739,18 +726,15 @@ dead_letter_fun(Reason, _State) -> dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> case rabbit_exchange:lookup(DLX) of - {error, not_found} -> - noreply(State); - _ -> - dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State) + {error, not_found} -> noreply(State); + _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, + State) end. dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed_mq = UMQ, - dlx = DLX, - backing_queue = BQ, - backing_queue_state = BQS}) -> + State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC, + dlx = DLX}) -> {ok, _, QPids} = rabbit_basic:publish( rabbit_basic:delivery( @@ -760,81 +744,36 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, QPids, State#q.queue_monitors), publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS), - cleanup_after_confirm(State1#q{backing_queue_state = BQS1}); - _ -> State2 = - lists:foldl( - fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> - UQM1 = rabbit_misc:gb_trees_set_insert( - QPid, MsgSeqNo, UQM), - State0#q{unconfirmed_qm = UQM1} - end, State1, QPids), - noreply(State2#q{ - unconfirmed_mq = - gb_trees:insert( - MsgSeqNo, {gb_sets:from_list(QPids), - AckTag}, UMQ)}) + [] -> cleanup_after_confirm([AckTag], State1); + _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), + noreply(State1#q{unconfirmed = UC1}) end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed_qm = UQM}) -> + unconfirmed = UC}) -> case pmon:is_monitored(QPid, QMons) of - false -> noreply(State); - true -> rabbit_log:info("DLQ ~p (for ~s) died~n", - [QPid, rabbit_misc:rs(qname(State))]), - State1 = State#q{queue_monitors = pmon:erase(QPid, QMons)}, - case gb_trees:lookup(QPid, UQM) of - none -> - noreply(State1); - {value, MsgSeqNosSet} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> rabbit_log:warning( - "Dead queue lost ~p messages~n", - [gb_sets:size(MsgSeqNosSet)]); - false -> ok - end, - handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, - State1) - end + false -> + noreply(State); + true -> + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {Lost, _UC1} = dtree:take_all(QPid, UC), + rabbit_log:warning( + "DLQ ~p for ~s died with ~p unconfirmed messages~n", + [QPid, rabbit_misc:rs(qname(State)), length(Lost)]); + false -> ok + end, + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = pmon:erase(QPid, QMons), + unconfirmed = UC1}) end. -handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckTags1, UMQ3} = - lists:foldl( - fun (MsgSeqNo, {AckTags, UMQ1}) -> - {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), - QPids1 = gb_sets:delete(QPid, QPids), - case gb_sets:is_empty(QPids1) of - true -> {[AckTag | AckTags], - gb_trees:delete(MsgSeqNo, UMQ1)}; - false -> {AckTags, gb_trees:update( - MsgSeqNo, {QPids1, AckTag}, UMQ1)} - end - end, {[], UMQ}, MsgSeqNos), - {_Guids, BQS1} = BQ:ack(AckTags1, BQS), - MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), - gb_sets:from_list(MsgSeqNos)), - State1 = case gb_sets:is_empty(MsgSeqNos1) of - false -> State#q{ - unconfirmed_qm = - gb_trees:update(QPid, MsgSeqNos1, UQM)}; - true -> State#q{ - queue_monitors = - pmon:demonitor(QPid, State#q.queue_monitors), - unconfirmed_qm = - gb_trees:delete(QPid, UQM)} - end, - cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, - backing_queue_state = BQS1}). - stop_later(Reason, State) -> stop_later(Reason, undefined, noreply, State). -stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> - case {gb_trees:is_empty(UMQ), Reply} of +stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) -> + case {dtree:is_empty(UC), Reply} of {true, noreply} -> {stop, Reason, State}; {true, _} -> @@ -843,16 +782,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) end. -cleanup_after_confirm(State = #q{delayed_stop = DS, - unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) andalso DS =/= undefined of +cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, + unconfirmed = UC, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of {_, {_, noreply}} -> ok; {_, {From, Reply}} -> gen_server2:reply(From, Reply) end, {Reason, _} = DS, - {stop, Reason, State}; - false -> noreply(State) + {stop, Reason, State1}; + false -> noreply(State1) end. already_been_here(_Delivery, #q{dlx = undefined}) -> @@ -885,29 +828,33 @@ make_dead_letter_msg(DLX, Reason, exchange_name = Exchange, routing_keys = RoutingKeys}, State = #q{dlx_routing_key = DlxRoutingKey}) -> - Headers = rabbit_basic:extract_headers(Content), - #resource{name = QName} = qname(State), - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], - Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers), - {DeathRoutingKeys, Headers2} = + {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of - undefined -> {RoutingKeys, Headers1}; + undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], - lists:keydelete(<<"CC">>, 1, Headers1)} + fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} + end, + #resource{name = QName} = qname(State), + HeadersFun2 = + fun (Headers) -> + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RoutingKeys1 = + [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + Info = [{<<"reason">>, + longstr, list_to_binary(atom_to_list(Reason))}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, + [{longstr, Key} || Key <- RoutingKeys1]}], + HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, + Info, Headers)) end, - Content1 = rabbit_basic:replace_headers(Headers2, Content), + Content1 = rabbit_basic:map_headers(HeadersFun2, Content), Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), routing_keys = DeathRoutingKeys, content = Content1}. - now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> @@ -1098,7 +1045,8 @@ handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + Confirm = should_confirm_message(Delivery, State), + {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), reply(Delivered, case Delivered of true -> maybe_record_confirm_message(Confirm, State1); false -> discard_delivery(Delivery, State1) @@ -1233,8 +1181,15 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State) -> - handle_confirm(MsgSeqNos, QPid, State); +handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> + {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), + State1 = case dtree:is_defined(QPid, UC1) of + false -> QMons = State#q.queue_monitors, + State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; + true -> State + end, + cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State1#q{unconfirmed = UC1}); handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> noreply(State); @@ -1271,14 +1226,14 @@ handle_cast({ack, AckTags, ChPid}, State) -> handle_cast({reject, AckTags, Requeue, ChPid}, State) -> noreply(subtract_acks( ChPid, AckTags, State, - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case Requeue of - true -> requeue_and_run(AckTags, State1); - false -> Fun = dead_letter_fun(rejected, State), + case Requeue of + true -> fun (State1) -> requeue_and_run(AckTags, State1) end; + false -> Fun = dead_letter_fun(rejected, State), + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> BQS1 = BQ:fold(Fun, BQS, AckTags), State1#q{backing_queue_state = BQS1} - end + end end)); handle_cast(delete_immediately, State) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 25485ca0..8ad59016 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,7 +20,7 @@ -export([publish/4, publish/6, publish/1, message/3, message/4, properties/1, append_table_header/3, - extract_headers/1, replace_headers/2, delivery/4, header_routes/1]). + extract_headers/1, map_headers/2, delivery/4, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -63,8 +63,8 @@ -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). --spec(replace_headers/2 :: (headers(), rabbit_types:content()) - -> rabbit_types:content()). +-spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) + -> rabbit_types:content()). -spec(header_routes/1 :: (undefined | rabbit_framing:amqp_table()) -> [string()]). @@ -193,15 +193,18 @@ extract_headers(Content) -> rabbit_binary_parser:ensure_content_decoded(Content), Headers. -replace_headers(Headers, Content = #content{properties = Props}) -> +map_headers(F, Content) -> + Content1 = rabbit_binary_parser:ensure_content_decoded(Content), + #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, + Headers1 = F(Headers), rabbit_binary_generator:clear_encoded_content( - Content#content{properties = Props#'P_basic'{headers = Headers}}). + Content1#content{properties = Props#'P_basic'{headers = Headers1}}). indexof(L, Element) -> indexof(L, Element, 1). -indexof([], _Element, _N) -> 0; -indexof([Element | _Rest], Element, N) -> N; -indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). +indexof([], _Element, _N) -> 0; +indexof([Element | _Rest], Element, N) -> N; +indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 081af766..2245cf65 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,8 +37,8 @@ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state}). + stats_timer, confirm_enabled, publish_seqno, unconfirmed, + confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), confirmed = [], capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, @@ -548,45 +547,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State) -> - {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), - record_confirms(MXs, State1). - -process_confirms(MsgSeqNos, QPid, Nack, State) -> - lists:foldl( - fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack); - none -> Acc - end - end, {[], State}, MsgSeqNos). - -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, - {MXs, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}}, - Nack) -> - State1 = case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> UQM1 = gb_trees:delete(QPid, UQM), - State#ch{unconfirmed_qm = UQM1}; - false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State#ch{unconfirmed_qm = UQM1} - end; - none -> - State - end, - Qs1 = gb_sets:del_element(QPid, Qs), - %% If QPid somehow died initiating a nack, clear the message from - %% internal data-structures. Also, cleanup empty entries. - case (Nack orelse gb_sets:is_empty(Qs1)) of - true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ), - {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}}; - false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), - {MXs, State1#ch{unconfirmed_mq = UMQ1}} - end. +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1147,22 +1110,13 @@ consumer_monitor(ConsumerTag, State end. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> - MsgSeqNos = case gb_trees:lookup(QPid, UQM) of - {value, MsgSet} -> gb_sets:to_list(MsgSet); - none -> [] - end, - %% We remove the MsgSeqNos from UQM before calling - %% process_confirms to prevent each MsgSeqNo being removed from - %% the set one by one which which would be inefficient - State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, - {Nack, SendFun} = - case rabbit_misc:is_abnormal_termination(Reason) of - true -> {true, fun send_nacks/2}; - false -> {false, fun record_confirms/2} - end, - {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), - SendFun(MXs, State2). +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {MXs, UC1} = dtree:take_all(QPid, UC), + send_nacks(MXs, State#ch{unconfirmed = UC1}); + false -> {MXs, UC1} = dtree:take(QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}) + end. handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, @@ -1389,21 +1343,8 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed_mq = UMQ} = State, - UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), - SingletonSet = gb_sets:singleton(MsgSeqNo), - lists:foldl( - fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) -> - case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State0#ch{unconfirmed_qm = UQM1}; - none -> - UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), - State0#ch{unconfirmed_qm = UQM1} - end - end, State#ch{unconfirmed_mq = UMQ1}, QPids). + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, + State#ch.unconfirmed)}. send_nacks([], State) -> State; @@ -1441,11 +1382,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UMQ) of + CutOff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo + false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1459,8 +1400,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, maybe_complete_tx(State = #ch{tx_status = in_progress}) -> State; -maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) of +maybe_complete_tx(State = #ch{unconfirmed = UC}) -> + case dtree:is_empty(UC) of false -> State; true -> complete_tx(State#ch{confirmed = []}) end. @@ -1488,8 +1429,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> - gb_trees:size(UMQ); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> + dtree:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6a775adf..51f88c8f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -328,7 +328,7 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> 0 -> Arg ++ ".0"; _ -> Arg end), - Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]), + Inform("Setting memory threshold on ~p to ~p", [Node, Frac]), rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl new file mode 100644 index 00000000..831d5290 --- /dev/null +++ b/src/rabbit_disk_monitor.erl @@ -0,0 +1,196 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_disk_monitor). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0, + set_check_interval/1, get_disk_free/0]). + +-define(SERVER, ?MODULE). +-define(DEFAULT_DISK_CHECK_INTERVAL, 60000). + +-record(state, {dir, + limit, + timeout, + timer, + alarmed + }). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(disk_free_limit() :: (integer() | {'mem_relative', float()})). +-spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()). +-spec(get_disk_free_limit/0 :: () -> integer()). +-spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok'). +-spec(get_check_interval/0 :: () -> integer()). +-spec(set_check_interval/1 :: (integer()) -> 'ok'). +-spec(get_disk_free/0 :: () -> (integer() | 'unknown')). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +get_disk_free_limit() -> + gen_server:call(?MODULE, get_disk_free_limit, infinity). + +set_disk_free_limit(Limit) -> + gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity). + +get_check_interval() -> + gen_server:call(?MODULE, get_check_interval, infinity). + +set_check_interval(Interval) -> + gen_server:call(?MODULE, {set_check_interval, Interval}, infinity). + +get_disk_free() -> + gen_server:call(?MODULE, get_disk_free, infinity). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +init([Limit]) -> + TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL), + Dir = dir(), + State = #state { dir = Dir, + timeout = ?DEFAULT_DISK_CHECK_INTERVAL, + timer = TRef, + alarmed = false}, + case {catch get_disk_free(Dir), + vm_memory_monitor:get_total_memory()} of + {N1, N2} when is_integer(N1), is_integer(N2) -> + {ok, set_disk_limits(State, Limit)}; + _ -> + rabbit_log:info("Disabling disk free space monitoring " + "on unsupported platform~n"), + {stop, unsupported_platform} + end. + +handle_call(get_disk_free_limit, _From, State) -> + {reply, interpret_limit(State#state.limit), State}; + +handle_call({set_disk_free_limit, Limit}, _From, State) -> + {reply, ok, set_disk_limits(State, Limit)}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_disk_free, _From, State = #state { dir = Dir }) -> + {reply, get_disk_free(Dir), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(update, State) -> + {noreply, internal_update(State)}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% Server Internals +%%---------------------------------------------------------------------------- + +% the partition / drive containing this directory will be monitored +dir() -> rabbit_mnesia:dir(). + +set_disk_limits(State, Limit) -> + State1 = State#state { limit = Limit }, + rabbit_log:info("Disk free limit set to ~pMB~n", + [trunc(interpret_limit(Limit) / 1048576)]), + internal_update(State1). + +internal_update(State = #state { limit = Limit, + dir = Dir, + alarmed = Alarmed}) -> + CurrentFreeBytes = get_disk_free(Dir), + LimitBytes = interpret_limit(Limit), + NewAlarmed = CurrentFreeBytes < LimitBytes, + case {Alarmed, NewAlarmed} of + {false, true} -> + emit_update_info("exceeded", CurrentFreeBytes, LimitBytes), + alarm_handler:set_alarm({{resource_limit, disk, node()}, []}); + {true, false} -> + emit_update_info("below limit", CurrentFreeBytes, LimitBytes), + alarm_handler:clear_alarm({resource_limit, disk, node()}); + _ -> + ok + end, + State #state {alarmed = NewAlarmed}. + +get_disk_free(Dir) -> + get_disk_free(Dir, os:type()). + +get_disk_free(Dir, {unix, Sun}) + when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> + parse_free_unix(rabbit_misc:os_cmd("/usr/bin/df -k " ++ Dir)); +get_disk_free(Dir, {unix, _}) -> + parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); +get_disk_free(Dir, {win32, _}) -> + parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir)); +get_disk_free(_, _) -> + unknown. + +parse_free_unix(CommandResult) -> + [_, Stats | _] = string:tokens(CommandResult, "\n"), + [_FS, _Total, _Used, Free | _] = string:tokens(Stats, " \t"), + list_to_integer(Free) * 1024. + +parse_free_win32(CommandResult) -> + LastLine = lists:last(string:tokens(CommandResult, "\r\n")), + [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "), + list_to_integer(Free). + +interpret_limit({mem_relative, R}) -> + round(R * vm_memory_monitor:get_total_memory()); +interpret_limit(L) -> + L. + +emit_update_info(State, CurrentFree, Limit) -> + rabbit_log:info( + "Disk free space limit now ~s. Free bytes:~p Limit:~p~n", + [State, CurrentFree, Limit]). + +start_timer(Timeout) -> + {ok, TRef} = timer:send_interval(Timeout, update), + TRef. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 83e28c44..910a89b4 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -242,6 +242,11 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). +%% Optimisation +route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}}, + #delivery{message = #basic_message{routing_keys = RKs}}) -> + [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; + route(X = #exchange{name = XName}, Delivery) -> route1(Delivery, {queue:from_list([X]), XName, []}). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index bfdab487..5db0fa2f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -177,11 +177,12 @@ dropwhile(Pred, MsgFun, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> - Len = BQ:len(BQS), + Len = BQ:len(BQS), BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), - Dropped = Len - BQ:len(BQS1), + Len1 = BQ:len(BQS1), + ok = gm:broadcast(GM, {set_length, Len1}), + Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 }. @@ -241,11 +242,11 @@ ack(AckTags, State = #state { gm = GM, backing_queue_state = BQS, ack_msg_id = AM }) -> {MsgIds, BQS1} = BQ:ack(AckTags, BQS), - AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; _ -> ok = gm:broadcast(GM, {ack, MsgIds}) end, + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f0dff354..404cca2c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -838,7 +838,7 @@ process_instruction({ack, MsgIds}, process_instruction({fold, MsgFun, AckTags}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQS1 = BQ:fold(AckTags, MsgFun, BQS), + BQS1 = BQ:fold(MsgFun, BQS, AckTags), {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ddf7f574..c1be7613 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,8 +46,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([dict_cons/3, orddict_cons/3, gb_trees_cons/3, - gb_trees_set_insert/3]). +-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). @@ -60,6 +59,7 @@ -export([append_rpc_all_nodes/4]). -export([multi_call/2]). -export([quit/1]). +-export([os_cmd/1]). %%---------------------------------------------------------------------------- @@ -176,7 +176,6 @@ -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). --spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()). -spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). -spec(gb_trees_foreach/2 :: (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). @@ -204,6 +203,7 @@ -spec(multi_call/2 :: ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(quit/1 :: (integer() | string()) -> no_return()). +-spec(os_cmd/1 :: (string()) -> string()). -endif. @@ -717,15 +717,6 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. -gb_trees_set_insert(Key, Value, Tree) -> - case gb_trees:lookup(Key, Tree) of - {value, Values} -> - Values1 = gb_sets:insert(Value, Values), - gb_trees:update(Key, Values1, Tree); - none -> - gb_trees:insert(Key, gb_sets:singleton(Value), Tree) - end. - gb_trees_fold(Fun, Acc, Tree) -> gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). @@ -914,3 +905,10 @@ quit(Status) -> {unix, _} -> halt(Status); {win32, _} -> init:stop(Status) end. + +os_cmd(Command) -> + Exec = hd(string:tokens(Command, " ")), + case os:find_executable(Exec) of + false -> throw({command_not_found, Exec}); + _ -> os:cmd(Command) + end. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 329c07dc..9a972d9e 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -39,15 +39,15 @@ names(Hostname) -> Self = self(), - process_flag(trap_exit, true), - Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end), + Ref = make_ref(), + {Pid, MRef} = spawn_monitor( + fun () -> Self ! {Ref, net_adm:names(Hostname)} end), timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), - Res = receive - {names, Names} -> Names; - {'EXIT', Pid, Reason} -> {error, Reason} - end, - process_flag(trap_exit, false), - Res. + receive + {Ref, Names} -> erlang:demonitor(MRef, [flush]), + Names; + {'DOWN', MRef, process, Pid, Reason} -> {error, Reason} + end. diagnostics(Nodes) -> Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 47e796dc..5e9e78d3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -25,7 +25,7 @@ -export([init/4, mainloop/2]). --export([conserve_memory/2, server_properties/1]). +-export([conserve_resources/2, server_properties/1]). -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). @@ -38,7 +38,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state, conserve_memory, + auth_mechanism, auth_state, conserve_resources, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, @@ -71,7 +71,7 @@ -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> rabbit_framing:amqp_table()). @@ -133,8 +133,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, +conserve_resources(Pid, Conserve) -> + Pid ! {conserve_resources, Conserve}, ok. server_properties(Protocol) -> @@ -218,7 +218,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf_len = 0, auth_mechanism = none, auth_state = none, - conserve_memory = false, + conserve_resources = false, last_blocked_by = none, last_blocked_at = never}, try @@ -276,8 +276,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {other, Other} -> handle_other(Other, Deb, State) end. -handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve})); +handle_other({conserve_resources, Conserve}, Deb, State) -> + recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -358,8 +358,8 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -control_throttle(State = #v1{connection_state = CS, - conserve_memory = Mem}) -> +control_throttle(State = #v1{connection_state = CS, + conserve_resources = Mem}) -> case {CS, Mem orelse credit_flow:blocked()} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; @@ -377,9 +377,9 @@ maybe_block(State = #v1{connection_state = blocking}) -> maybe_block(State) -> State. -update_last_blocked_by(State = #v1{conserve_memory = true}) -> - State#v1{last_blocked_by = mem}; -update_last_blocked_by(State = #v1{conserve_memory = false}) -> +update_last_blocked_by(State = #v1{conserve_resources = true}) -> + State#v1{last_blocked_by = resource}; +update_last_blocked_by(State = #v1{conserve_resources = false}) -> State#v1{last_blocked_by = flow}. close_connection(State = #v1{queue_collector = Collector, @@ -701,11 +701,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), State1 = control_throttle( - State#v1{connection_state = running, - connection = NewConnection, - conserve_memory = Conserve}), + State#v1{connection_state = running, + connection = NewConnection, + conserve_resources = Conserve}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 0965e3b3..bf2b4798 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -52,22 +52,20 @@ start_child(Mod, Args) -> start_child(Mod, Mod, Args). start_child(ChildId, Mod, Args) -> - {ok, _} = supervisor:start_child(?SERVER, - {ChildId, {Mod, start_link, Args}, - transient, ?MAX_WAIT, worker, [Mod]}), - ok. + child_reply(supervisor:start_child(?SERVER, + {ChildId, {Mod, start_link, Args}, + transient, ?MAX_WAIT, worker, [Mod]})). start_restartable_child(Mod) -> start_restartable_child(Mod, []). start_restartable_child(Mod, Args) -> Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), - {ok, _} = supervisor:start_child( - ?SERVER, - {Name, {rabbit_restartable_sup, start_link, - [Name, {Mod, start_link, Args}]}, - transient, infinity, supervisor, [rabbit_restartable_sup]}), - ok. + child_reply(supervisor:start_child( + ?SERVER, + {Name, {rabbit_restartable_sup, start_link, + [Name, {Mod, start_link, Args}]}, + transient, infinity, supervisor, [rabbit_restartable_sup]})). stop_child(ChildId) -> case supervisor:terminate_child(?SERVER, ChildId) of @@ -77,3 +75,9 @@ stop_child(ChildId) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. + + +%%---------------------------------------------------------------------------- + +child_reply({ok, _}) -> ok; +child_reply(X) -> X. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 85fe5426..e356d7ff 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -883,6 +883,8 @@ test_cluster_management() -> "invalid2@invalid"]), ok = assert_ram_node(), + ok = control_action(reset, []), + SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of pong -> passed = test_cluster_management2(SecondaryNode); @@ -898,7 +900,6 @@ test_cluster_management2(SecondaryNode) -> SecondaryNodeS = atom_to_list(SecondaryNode), %% make a disk node - ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), ok = assert_disc_node(), %% make a ram node @@ -1244,6 +1245,9 @@ test_confirms() -> }, rabbit_basic:build_content( #'P_basic'{delivery_mode = 2}, <<"">>)), + %% We must not kill the queue before the channel has processed the + %% 'publish'. + ok = rabbit_channel:flush(Ch), %% Crash the queue QPid1 ! boom, %% Wait for a nack @@ -2551,7 +2555,7 @@ test_queue_recover() -> {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), - rabbit_amqqueue:internal_delete(QName) + rabbit_amqqueue:internal_delete(QName, QPid1) end), passed. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index dc74b2f5..f3a8cacf 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -24,7 +24,7 @@ send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --record(wstate, {sock, channel, frame_max, protocol}). +-record(wstate, {sock, channel, frame_max, protocol, pending}). -define(HIBERNATE_AFTER, 5000). @@ -80,7 +80,8 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, - protocol = Protocol}])}. + protocol = Protocol, + pending = []}])}. start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> {ok, @@ -88,7 +89,8 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, - protocol = Protocol}])}. + protocol = Protocol, + pending = []}])}. mainloop(ReaderPid, State) -> try @@ -98,37 +100,41 @@ mainloop(ReaderPid, State) -> end, done. -mainloop1(ReaderPid, State) -> +mainloop1(ReaderPid, State = #wstate{pending = []}) -> receive Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) after ?HIBERNATE_AFTER -> erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) + end; +mainloop1(ReaderPid, State) -> + receive + Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + after 0 -> + ?MODULE:mainloop1(ReaderPid, flush(State)) end. handle_message({send_command, MethodRecord}, State) -> - ok = internal_send_command_async(MethodRecord, State), - State; + internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> - ok = internal_send_command_async(MethodRecord, Content, State), - State; + internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - ok = internal_send_command_async(MethodRecord, State), + State1 = flush(internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), - State; + State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - ok = internal_send_command_async(MethodRecord, Content, State), + State1 = flush(internal_send_command_async(MethodRecord, Content, State)), gen_server:reply(From, ok), - State; + State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> - ok = internal_send_command_async(MethodRecord, State), + State1 = internal_send_command_async(MethodRecord, State), rabbit_amqqueue:notify_sent(QPid, ChPid), - State; + State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State) -> - ok = internal_send_command_async(MethodRecord, Content, State), + State1 = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), - State; + State1; handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> rabbit_amqqueue:notify_sent_queue_down(QPid), State; @@ -184,22 +190,6 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. -%% We optimise delivery of small messages. Content-bearing methods -%% require at least three frames. Small messages always fit into -%% that. We hand their frames to the Erlang network functions in one -%% go, which may lead to somewhat more efficient processing in the -%% runtime and a greater chance of coalescing into fewer TCP packets. -%% -%% By contrast, for larger messages, split across many frames, we want -%% to allow interleaving of frames on different channels. Hence we -%% hand them to the Erlang network functions one frame at a time. -send_frames(Fun, Sock, Frames) when length(Frames) =< 3 -> - Fun(Sock, Frames); -send_frames(Fun, Sock, Frames) -> - lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame); - (_Frame, Other) -> Other - end, ok, Frames). - tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). @@ -209,9 +199,44 @@ internal_send_command(Sock, Channel, MethodRecord, Protocol) -> internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, Protocol) -> - ok = send_frames(fun tcp_send/2, Sock, - assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). + ok = lists:foldl(fun (Frame, ok) -> tcp_send(Sock, Frame); + (_Frame, Other) -> Other + end, ok, assemble_frames(Channel, MethodRecord, + Content, FrameMax, Protocol)). + +internal_send_command_async(MethodRecord, + State = #wstate{channel = Channel, + protocol = Protocol, + pending = Pending}) -> + Frame = assemble_frame(Channel, MethodRecord, Protocol), + maybe_flush(State#wstate{pending = [Frame | Pending]}). + +internal_send_command_async(MethodRecord, Content, + State = #wstate{channel = Channel, + frame_max = FrameMax, + protocol = Protocol, + pending = Pending}) -> + Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax, + Protocol), + maybe_flush(State#wstate{pending = [Frames | Pending]}). + +%% This magic number is the tcp-over-ethernet MSS (1460) minus the +%% minimum size of a AMQP basic.deliver method frame (24) plus basic +%% content header (22). The idea is that we want to flush just before +%% exceeding the MSS. +-define(FLUSH_THRESHOLD, 1414). + +maybe_flush(State = #wstate{pending = Pending}) -> + case iolist_size(Pending) >= ?FLUSH_THRESHOLD of + true -> flush(State); + false -> State + end. + +flush(State = #wstate{pending = []}) -> + State; +flush(State = #wstate{sock = Sock, pending = Pending}) -> + ok = port_cmd(Sock, lists:reverse(Pending)), + State#wstate{pending = []}. %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -231,21 +256,6 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(MethodRecord, - #wstate{sock = Sock, - channel = Channel, - protocol = Protocol}) -> - ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)). - -internal_send_command_async(MethodRecord, Content, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = send_frames(fun port_cmd/2, Sock, - assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). - port_cmd(Sock, Data) -> true = try rabbit_net:port_command(Sock, Data) catch error:Error -> exit({writer, send_failed, Error}) diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index fca55f02..fb184d1a 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -181,10 +181,10 @@ internal_update(State = #state { memory_limit = MemLimit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info(set, MemUsed, MemLimit), - alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []}); + alarm_handler:set_alarm({{resource_limit, memory, node()}, []}); {true, false} -> emit_update_info(clear, MemUsed, MemLimit), - alarm_handler:clear_alarm({vm_memory_high_watermark, node()}); + alarm_handler:clear_alarm({resource_limit, memory, node()}); _ -> ok end, |