diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-04-02 15:00:33 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-04-02 15:00:33 +0100 |
commit | b46957894bcdcebab47f45ed4ddc8d6faa08f948 (patch) | |
tree | 9318355f2cec2e8cb1d120dc16934861e8301b38 | |
parent | b8f0d90c07bbec23e4ea6dc9d0bfcc5c5cb77e48 (diff) | |
parent | d7caa83278adc50d18e36cb633524f2991d4831d (diff) | |
download | rabbitmq-server-b46957894bcdcebab47f45ed4ddc8d6faa08f948.tar.gz |
Merge bug24808.
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 6 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 7 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/Makefile | 4 | ||||
-rw-r--r-- | packaging/windows/Makefile | 5 | ||||
-rw-r--r-- | src/credit_flow.erl | 3 | ||||
-rw-r--r-- | src/dtree.erl | 111 | ||||
-rw-r--r-- | src/rabbit.erl | 6 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 89 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 116 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 194 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 107 | ||||
-rw-r--r-- | src/rabbit_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_disk_monitor.erl | 196 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 22 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 16 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 34 | ||||
-rw-r--r-- | src/rabbit_sup.erl | 24 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 4 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 110 | ||||
-rw-r--r-- | src/supervisor2.erl | 13 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 4 |
25 files changed, 676 insertions, 414 deletions
@@ -216,12 +216,12 @@ start-rabbit-on-node: all stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) -set-memory-alarm: all - echo "alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []})." | \ +set-resource-alarm: all + echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \ $(ERL_CALL) -clear-memory-alarm: all - echo "alarm_handler:clear_alarm({vm_memory_high_watermark, node()})." | \ +clear-resource-alarm: all + echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \ $(ERL_CALL) stop-node: diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ee07ebfe..ded3ab48 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -948,7 +948,7 @@ <term>source_kind</term> <listitem><para>The kind of the source of messages to which the binding is attached. Currently always - queue. With non-ASCII characters escaped as in + exchange. With non-ASCII characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> @@ -1074,8 +1074,8 @@ <varlistentry> <term>last_blocked_by</term> <listitem><para>The reason for which this connection - was last blocked. One of 'mem' - due to a memory - alarm, 'flow' - due to internal flow control, or + was last blocked. One of 'resource' - due to a memory + or disk alarm, 'flow' - due to internal flow control, or 'none' if the connection was never blocked.</para></listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index fd19051d..b7d14f20 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -19,6 +19,7 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, + {disk_free_limit, {mem_relative, 1.0}}, {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, {frame_max, 131072}, diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index bf93baba..79d44e1b 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,7 +25,8 @@ -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). --type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok')). +-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | + 'undefined'). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -45,8 +46,8 @@ -> {undefined, state()}). -spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). -spec(dropwhile/3 :: - (fun ((rabbit_types:message_properties()) -> boolean()), - msg_fun() | 'undefined', state()) + (fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), + state()) -> state()). -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index f1925757..234fc2c7 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -15,9 +15,11 @@ endif ifeq "$(RPM_OS)" "suse" REQUIRES=/sbin/chkconfig /sbin/service OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse' +START_PROG=setsid else REQUIRES=chkconfig initscripts OS_DEFINES=--define '_initrddir /etc/rc.d/init.d' +START_PROG=runuser rabbitmq --session-command endif rpms: clean server @@ -32,7 +34,7 @@ prepare: cp ${COMMON_DIR}/* SOURCES/ sed -i \ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ - -e 's|^START_PROG=.*$$|START_PROG="runuser rabbitmq --session-command"|' \ + -e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \ SOURCES/rabbitmq-server.init ifeq "$(RPM_OS)" "fedora" # Fedora says that only vital services should have Default-Start diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index a910941b..1c222162 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -8,10 +8,7 @@ dist: $(MAKE) -C $(SOURCE_DIR) mkdir $(SOURCE_DIR)/sbin - mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin - mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin - mv $(SOURCE_DIR)/scripts/rabbitmq-plugins.bat $(SOURCE_DIR)/sbin - mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin + mv $(SOURCE_DIR)/scripts/*.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile $(SOURCE_DIR)/*mk rm -f $(SOURCE_DIR)/README diff --git a/src/credit_flow.erl b/src/credit_flow.erl index 072f4d9d..ba99811f 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -96,7 +96,8 @@ peer_down(Peer) -> %% credit_deferred and thus send messages into the void... unblock(Peer), erase({credit_from, Peer}), - erase({credit_to, Peer}). + erase({credit_to, Peer}), + ok. %% -------------------------------------------------------------------------- diff --git a/src/dtree.erl b/src/dtree.erl new file mode 100644 index 00000000..265bb340 --- /dev/null +++ b/src/dtree.erl @@ -0,0 +1,111 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +%% A dual-index tree. +%% +%% Conceptually, what we want is a map that has two distinct sets of +%% keys (referred to here as primary and secondary, although that +%% shouldn't imply a hierarchy) pointing to one set of +%% values. However, in practice what we'll always want to do is insert +%% a value that's pointed at by (one primary, many secondaries) and +%% remove values that are pointed at by (one secondary, many +%% primaries) or (one secondary, all primaries). Thus the API. +%% +%% Entries exists while they have a non-empty secondary key set. The +%% 'take' operations return the entries that got removed, i.e. that +%% had no remaining secondary keys. take/3 expects entries to exist +%% with the supplied primary keys and secondary key. take/2 can cope +%% with the supplied secondary key having no entries. + +-module(dtree). + +-export([empty/0, insert/4, take/3, take/2, + is_defined/2, is_empty/1, smallest/1, size/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: {gb_tree(), gb_tree()}). + +-type(pk() :: any()). +-type(sk() :: any()). +-type(val() :: any()). +-type(kv() :: {pk(), val()}). + +-spec(empty/0 :: () -> ?MODULE()). +-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()). +-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). +-spec(smallest/1 :: (?MODULE()) -> kv()). +-spec(size/1 :: (?MODULE()) -> non_neg_integer()). + +-endif. + +%%---------------------------------------------------------------------------- + +empty() -> {gb_trees:empty(), gb_trees:empty()}. + +insert(PK, SKs, V, {P, S}) -> + {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P), + lists:foldl(fun (SK, S0) -> + case gb_trees:lookup(SK, S0) of + {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS), + gb_trees:update(SK, PKS1, S0); + none -> PKS = gb_sets:singleton(PK), + gb_trees:insert(SK, PKS, S0) + end + end, S, SKs)}. + +take(PKs, SK, {P, S}) -> + {KVs, P1} = take2(PKs, SK, P), + PKS = gb_sets:difference(gb_trees:get(SK, S), gb_sets:from_list(PKs)), + {KVs, {P1, case gb_sets:is_empty(PKS) of + true -> gb_trees:delete(SK, S); + false -> gb_trees:update(SK, PKS, S) + end}}. + +take(SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> {KVs, P1} = take2(gb_sets:to_list(PKS), SK, P), + {KVs, {P1, gb_trees:delete(SK, S)}} + end. + +is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). + +is_empty({P, _S}) -> gb_trees:is_empty(P). + +smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P), + {K, V}. + +size({P, _S}) -> gb_trees:size(P). + +%%---------------------------------------------------------------------------- + +take2(PKs, SK, P) -> + lists:foldl(fun (PK, {KVs, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + SKS1 = gb_sets:delete(SK, SKS), + case gb_sets:is_empty(SKS1) of + true -> {[{PK, V} | KVs], gb_trees:delete(PK, P0)}; + false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} + end + end, {[], P}, PKs). diff --git a/src/rabbit.erl b/src/rabbit.erl index dd5fb89c..eec7e34e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -327,7 +327,11 @@ status() -> [{vm_memory_high_watermark, {vm_memory_monitor, get_vm_memory_high_watermark, []}}, {vm_memory_limit, {vm_memory_monitor, - get_memory_limit, []}}]), + get_memory_limit, []}}, + {disk_free_limit, {rabbit_disk_monitor, + get_disk_free_limit, []}}, + {disk_free, {rabbit_disk_monitor, + get_disk_free, []}}]), S3 = rabbit_misc:with_exit_handler( fun () -> [] end, fun () -> [{file_descriptors, file_handle_cache:info()}] end), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 187ec1ab..04e0c141 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). --export([remote_conserve_memory/2]). %% Internal use only +-export([remote_conserve_resources/3]). %% Internal use only -record(alarms, {alertees, alarmed_nodes}). @@ -45,6 +45,9 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]), + + {ok, DiskLimit} = application:get_env(disk_free_limit), + rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]), ok. stop() -> @@ -61,41 +64,41 @@ on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}). %% Can't use alarm_handler:{set,clear}_alarm because that doesn't %% permit notifying a remote node. -remote_conserve_memory(Pid, true) -> +remote_conserve_resources(Pid, Source, true) -> gen_event:notify({alarm_handler, node(Pid)}, - {set_alarm, {{vm_memory_high_watermark, node()}, []}}); -remote_conserve_memory(Pid, false) -> + {set_alarm, {{resource_limit, Source, node()}, []}}); +remote_conserve_resources(Pid, Source, false) -> gen_event:notify({alarm_handler, node(Pid)}, - {clear_alarm, {vm_memory_high_watermark, node()}}). + {clear_alarm, {resource_limit, Source, node()}}). %%---------------------------------------------------------------------------- init([]) -> {ok, #alarms{alertees = dict:new(), - alarmed_nodes = sets:new()}}. + alarmed_nodes = dict:new()}}. handle_call({register, Pid, HighMemMFA}, State) -> - {ok, 0 < sets:size(State#alarms.alarmed_nodes), + {ok, 0 < dict:size(State#alarms.alarmed_nodes), internal_register(Pid, HighMemMFA, State)}; handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) -> - {ok, maybe_alert(fun sets:add_element/2, Node, State)}; +handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) -> + {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; -handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) -> - {ok, maybe_alert(fun sets:del_element/2, Node, State)}; +handle_event({clear_alarm, {resource_limit, Source, Node}}, State) -> + {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; handle_event({node_up, Node}, State) -> %% Must do this via notify and not call to avoid possible deadlock. ok = gen_event:notify( {alarm_handler, Node}, - {register, self(), {?MODULE, remote_conserve_memory, []}}), + {register, self(), {?MODULE, remote_conserve_resources, []}}), {ok, State}; handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun sets:del_element/2, Node, State)}; + {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)}; handle_event({register, Pid, HighMemMFA}, State) -> {ok, internal_register(Pid, HighMemMFA, State)}; @@ -118,34 +121,58 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN, - alertees = Alertees}) -> - AN1 = SetFun(Node, AN), - BeforeSz = sets:size(AN), - AfterSz = sets:size(AN1), +dict_unappend_all(Key, _Val, Dict) -> + dict:erase(Key, Dict). + +dict_unappend(Key, Val, Dict) -> + case lists:delete(Val, dict:fetch(Key, Dict)) of + [] -> dict:erase(Key, Dict); + X -> dict:store(Key, X, Dict) + end. + +count_dict_values(Val, Dict) -> + dict:fold(fun (_Node, List, Count) -> + Count + case lists:member(Val, List) of + true -> 1; + false -> 0 + end + end, 0, Dict). + +maybe_alert(UpdateFun, Node, Source, + State = #alarms{alarmed_nodes = AN, + alertees = Alertees}) -> + AN1 = UpdateFun(Node, Source, AN), + BeforeSz = count_dict_values(Source, AN), + AfterSz = count_dict_values(Source, AN1), + %% If we have changed our alarm state, inform the remotes. IsLocal = Node =:= node(), - if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees); - IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees); - true -> ok + if IsLocal andalso BeforeSz < AfterSz -> + ok = alert_remote(true, Alertees, Source); + IsLocal andalso BeforeSz > AfterSz -> + ok = alert_remote(false, Alertees, Source); + true -> + ok end, %% If the overall alarm state has changed, inform the locals. - case {BeforeSz, AfterSz} of - {0, 1} -> ok = alert_local(true, Alertees); - {1, 0} -> ok = alert_local(false, Alertees); + case {dict:size(AN), dict:size(AN1)} of + {0, 1} -> ok = alert_local(true, Alertees, Source); + {1, 0} -> ok = alert_local(false, Alertees, Source); {_, _} -> ok end, State#alarms{alarmed_nodes = AN1}. -alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2). +alert_local(Alert, Alertees, _Source) -> + alert(Alertees, [Alert], fun erlang:'=:='/2). -alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2). +alert_remote(Alert, Alertees, Source) -> + alert(Alertees, [Source, Alert], fun erlang:'=/='/2). -alert(Alert, Alertees, NodeComparator) -> +alert(Alertees, AlertArg, NodeComparator) -> Node = node(), dict:fold(fun (Pid, {M, F, A}, ok) -> case NodeComparator(Node, node(Pid)) of - true -> apply(M, F, A ++ [Pid, Alert]); + true -> apply(M, F, A ++ [Pid] ++ AlertArg); false -> ok end end, ok, Alertees). @@ -153,9 +180,9 @@ alert(Alert, Alertees, NodeComparator) -> internal_register(Pid, {M, F, A} = HighMemMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), - case sets:is_element(node(), State#alarms.alarmed_nodes) of - true -> ok = apply(M, F, A ++ [Pid, true]); - false -> ok + case dict:find(node(), State#alarms.alarmed_nodes) of + {ok, _Sources} -> apply(M, F, A ++ [Pid, true]); + error -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertees), State#alarms{alertees = NewAlertees}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9b6f14ca..9ecbcbc3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,7 +32,7 @@ %% internal --export([internal_declare/2, internal_delete/1, run_backing_queue/3, +-export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -144,11 +144,11 @@ -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). --spec(internal_delete/1 :: - (name()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit() | - fun (() -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). +-spec(internal_delete/2 :: + (name(), pid()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -231,7 +231,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName), + false -> TailFun = internal_delete(QueueName, QPid), fun () -> TailFun(), ExistingQ end end end @@ -330,54 +330,47 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). check_declare_arguments(QueueName, Args) -> - [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of - ok -> ok; - {error, Error} -> rabbit_misc:protocol_error( - precondition_failed, - "invalid arg '~s' for ~s: ~255p", - [Key, rabbit_misc:rs(QueueName), Error]) - end || - {Key, Fun} <- - [{<<"x-expires">>, fun check_integer_argument/2}, - {<<"x-message-ttl">>, fun check_integer_argument/2}, - {<<"x-ha-policy">>, fun check_ha_policy_argument/2}, - {<<"x-dead-letter-exchange">>, fun check_string_argument/2}, - {<<"x-dead-letter-routing-key">>, - fun check_dlxrk_argument/2}]], + Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, + {<<"x-message-ttl">>, fun check_positive_int_arg/2}, + {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, + {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], + [case rabbit_misc:table_lookup(Args, Key) of + undefined -> ok; + TypeVal -> case Fun(TypeVal, Args) of + ok -> ok; + {error, Error} -> rabbit_misc:protocol_error( + precondition_failed, + "invalid arg '~s' for ~s: ~255p", + [Key, rabbit_misc:rs(QueueName), + Error]) + end + end || {Key, Fun} <- Checks], ok. -check_string_argument(undefined, _Args) -> - ok; -check_string_argument({longstr, _}, _Args) -> +check_string_arg({longstr, _}, _Args) -> ok; -check_string_argument({Type, _}, _) -> +check_string_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. -check_integer_argument(undefined, _Args) -> - ok; -check_integer_argument({Type, Val}, _Args) when Val > 0 -> +check_positive_int_arg({Type, Val}, _Args) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of - true -> ok; - false -> {error, {unacceptable_type, Type}} - end; -check_integer_argument({_Type, Val}, _Args) -> - {error, {value_zero_or_less, Val}}. + false -> {error, {unacceptable_type, Type}}; + true when Val =< 0 -> {error, {value_zero_or_less, Val}}; + true -> ok + end. -check_dlxrk_argument(undefined, _Args) -> - ok; -check_dlxrk_argument({longstr, _}, Args) -> +check_dlxrk_arg({longstr, _}, Args) -> case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of undefined -> {error, routing_key_but_no_dlx_defined}; _ -> ok end; -check_dlxrk_argument({Type, _}, _Args) -> +check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -check_ha_policy_argument(undefined, _Args) -> - ok; -check_ha_policy_argument({longstr, <<"all">>}, _Args) -> +check_ha_policy_arg({longstr, <<"all">>}, _Args) -> ok; -check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> +check_ha_policy_arg({longstr, <<"nodes">>}, Args) -> case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of undefined -> {error, {require, 'x-ha-policy-params'}}; @@ -393,9 +386,9 @@ check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> {Type, _} -> {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} end; -check_ha_policy_argument({longstr, Policy}, _Args) -> +check_ha_policy_arg({longstr, Policy}, _Args) -> {error, {invalid_ha_policy, Policy}}; -check_ha_policy_argument({Type, _}, _Args) -> +check_ha_policy_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. list() -> @@ -534,13 +527,19 @@ internal_delete1(QueueName) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName). -internal_delete(QueueName) -> +internal_delete(QueueName, QPid) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - rabbit_binding:process_deletions(Deletions) + T = rabbit_binding:process_deletions(Deletions), + fun() -> + ok = T(), + ok = rabbit_event:notify(queue_deleted, + [{pid, QPid}, + {name, QueueName}]) + end end end). @@ -555,14 +554,25 @@ set_maximum_since_use(QPid, Age) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid, - slave_pids = []} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])), - rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), Dels)) + fun () -> QsDels = + qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || + #amqqueue{name = QName, pid = Pid, + slave_pids = []} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])), + {Qs, Dels} = lists:unzip(QsDels), + T = rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels)), + fun () -> + T(), + lists:foreach( + fun({QName, QPid}) -> + ok = rabbit_event:notify(queue_deleted, + [{pid, QPid}, + {name, QName}]) + end, Qs) + end end). delete_queue(QueueName) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 106a9960..7c1e4573 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -24,9 +24,6 @@ -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(BASE_MESSAGE_PROPERTIES, - #message_properties{expiry = undefined, needs_confirming = false}). - -export([start_link/1, info_keys/0]). -export([init_with_backing_queue_state/7]). @@ -51,8 +48,7 @@ ttl, ttl_timer_ref, publish_seqno, - unconfirmed_mq, - unconfirmed_qm, + unconfirmed, delayed_stop, queue_monitors, dlx, @@ -138,8 +134,7 @@ init(Q) -> dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, @@ -164,8 +159,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, expiry_timer_ref = undefined, ttl = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = MTC}, @@ -183,16 +177,13 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate(Reason, State = #q{q = #amqqueue{name = QName}, backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? - terminate_shutdown(fun (BQS) -> - rabbit_event:notify( - queue_deleted, [{pid, self()}, - {name, QName}]), - BQS1 = BQ:delete_and_terminate(Reason, BQS), - %% don't care if the internal delete - %% doesn't return 'ok'. - rabbit_amqqueue:internal_delete(qname(State)), - BQS1 - end, State). + terminate_shutdown( + fun (BQS) -> + BQS1 = BQ:delete_and_terminate(Reason, BQS), + %% don't care if the internal delete doesn't return 'ok'. + rabbit_amqqueue:internal_delete(QName, self()), + BQS1 + end, State). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -333,12 +324,10 @@ ensure_expiry_timer(State = #q{expires = undefined}) -> State; ensure_expiry_timer(State = #q{expires = Expires}) -> case is_unused(State) of - true -> - NewState = stop_expiry_timer(State), - TRef = erlang:send_after(Expires, self(), maybe_expire), - NewState#q{expiry_timer_ref = TRef}; - false -> - State + true -> NewState = stop_expiry_timer(State), + TRef = erlang:send_after(Expires, self(), maybe_expire), + NewState#q{expiry_timer_ref = TRef}; + false -> State end. ensure_stats_timer(State) -> @@ -532,15 +521,10 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, {false, BQS1} -> DeliverFun = fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - %% we don't need an expiry here because - %% messages are not being enqueued, so we use - %% an empty message_properties. - {AckTag, BQS3} = - BQ:publish_delivered( - AckRequired, Message, - (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = needs_confirming(Confirm)}, - SenderPid, BQS2), + Props = message_properties(Confirm, State1), + {AckTag, BQS3} = BQ:publish_delivered( + AckRequired, Message, Props, + SenderPid, BQS2), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS3}} end, @@ -567,8 +551,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), case Delivered of true -> State2; - false -> Props = (message_properties(State)) #message_properties{ - needs_confirming = needs_confirming(Confirm)}, + false -> Props = message_properties(Confirm, State), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. @@ -696,8 +679,9 @@ discard_delivery(#delivery{sender = SenderPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. -message_properties(#q{ttl=TTL}) -> - #message_properties{expiry = calculate_msg_expiry(TTL)}. +message_properties(Confirm, #q{ttl = TTL}) -> + #message_properties{expiry = calculate_msg_expiry(TTL), + needs_confirming = needs_confirming(Confirm)}. calculate_msg_expiry(undefined) -> undefined; calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). @@ -735,18 +719,15 @@ dead_letter_fun(Reason, _State) -> dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> case rabbit_exchange:lookup(DLX) of - {error, not_found} -> - noreply(State); - _ -> - dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State) + {error, not_found} -> noreply(State); + _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, + State) end. dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed_mq = UMQ, - dlx = DLX, - backing_queue = BQ, - backing_queue_state = BQS}) -> + State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC, + dlx = DLX}) -> {ok, _, QPids} = rabbit_basic:publish( rabbit_basic:delivery( @@ -755,20 +736,9 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State1 = lists:foldl(fun monitor_queue/2, State, QPids), State2 = State1#q{publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> {_, BQS1} = BQ:ack([AckTag], BQS), - cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); - _ -> State3 = - lists:foldl( - fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> - UQM1 = rabbit_misc:gb_trees_set_insert( - QPid, MsgSeqNo, UQM), - State0#q{unconfirmed_qm = UQM1} - end, State2, QPids), - noreply(State3#q{ - unconfirmed_mq = - gb_trees:insert( - MsgSeqNo, {gb_sets:from_list(QPids), - AckTag}, UMQ)}) + [] -> cleanup_after_confirm([AckTag], State2); + _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), + noreply(State2#q{unconfirmed = UC1}) end. monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> @@ -787,65 +757,31 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed_qm = UQM}) -> + unconfirmed = UC}) -> case dict:find(QPid, QMons) of error -> noreply(State); {ok, _} -> - #resource{name = QName} = qname(State), - rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]), - case gb_trees:lookup(QPid, UQM) of - none -> - noreply(State); - {value, MsgSeqNosSet} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> rabbit_log:warning( - "Dead queue lost ~p messages~n", - [gb_sets:size(MsgSeqNosSet)]); - false -> ok - end, - handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, - State#q{queue_monitors = - dict:erase(QPid, QMons)}) - end + rabbit_log:info("DLQ ~p (for ~s) died~n", + [QPid, rabbit_misc:rs(qname(State))]), + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + case (MsgSeqNoAckTags =/= [] andalso + rabbit_misc:is_abnormal_termination(Reason)) of + true -> rabbit_log:warning("Dead queue lost ~p messages~n", + [length(MsgSeqNoAckTags)]); + false -> ok + end, + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = dict:erase(QPid, QMons), + unconfirmed = UC1}) end. -handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckTags1, UMQ3} = - lists:foldl( - fun (MsgSeqNo, {AckTags, UMQ1}) -> - {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), - QPids1 = gb_sets:delete(QPid, QPids), - case gb_sets:is_empty(QPids1) of - true -> {[AckTag | AckTags], - gb_trees:delete(MsgSeqNo, UMQ1)}; - false -> {AckTags, gb_trees:update( - MsgSeqNo, {QPids1, AckTag}, UMQ1)} - end - end, {[], UMQ}, MsgSeqNos), - {_Guids, BQS1} = BQ:ack(AckTags1, BQS), - MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), - gb_sets:from_list(MsgSeqNos)), - State1 = case gb_sets:is_empty(MsgSeqNos1) of - false -> State#q{ - unconfirmed_qm = - gb_trees:update(QPid, MsgSeqNos1, UQM)}; - true -> demonitor_queue( - QPid, State#q{ - unconfirmed_qm = - gb_trees:delete(QPid, UQM)}) - end, - cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, - backing_queue_state = BQS1}). - stop_later(Reason, State) -> stop_later(Reason, undefined, noreply, State). -stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> - case {gb_trees:is_empty(UMQ), Reply} of +stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) -> + case {dtree:is_empty(UC), Reply} of {true, noreply} -> {stop, Reason, State}; {true, _} -> @@ -854,16 +790,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) end. -cleanup_after_confirm(State = #q{delayed_stop = DS, - unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) andalso DS =/= undefined of +cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, + unconfirmed = UC, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of {_, {_, noreply}} -> ok; {_, {From, Reply}} -> gen_server2:reply(From, Reply) end, {Reason, _} = DS, - {stop, Reason, State}; - false -> noreply(State) + {stop, Reason, State1}; + false -> noreply(State1) end. already_been_here(_Delivery, #q{dlx = undefined}) -> @@ -1244,8 +1184,14 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State) -> - handle_confirm(MsgSeqNos, QPid, State); +handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> + {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), + State1 = case dtree:is_defined(QPid, UC1) of + false -> demonitor_queue(QPid, State); + true -> State + end, + cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State1#q{unconfirmed = UC1}); handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> noreply(State); @@ -1285,14 +1231,14 @@ handle_cast({ack, AckTags, ChPid}, State) -> handle_cast({reject, AckTags, Requeue, ChPid}, State) -> noreply(subtract_acks( ChPid, AckTags, State, - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case Requeue of - true -> requeue_and_run(AckTags, State1); - false -> Fun = dead_letter_fun(rejected, State), + case Requeue of + true -> fun (State1) -> requeue_and_run(AckTags, State1) end; + false -> Fun = dead_letter_fun(rejected, State), + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> BQS1 = BQ:fold(Fun, BQS, AckTags), State1#q{backing_queue_state = BQS1} - end + end end)); handle_cast(delete_immediately, State) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 25485ca0..a89aa074 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -199,9 +199,9 @@ replace_headers(Headers, Content = #content{properties = Props}) -> indexof(L, Element) -> indexof(L, Element, 1). -indexof([], _Element, _N) -> 0; -indexof([Element | _Rest], Element, N) -> N; -indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). +indexof([], _Element, _N) -> 0; +indexof([Element | _Rest], Element, N) -> N; +indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index bb6636eb..4a0e93be 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,8 +37,8 @@ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state}). + stats_timer, confirm_enabled, publish_seqno, unconfirmed, + confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), confirmed = [], capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, @@ -548,45 +547,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State) -> - {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), - record_confirms(MXs, State1). - -process_confirms(MsgSeqNos, QPid, Nack, State) -> - lists:foldl( - fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack); - none -> Acc - end - end, {[], State}, MsgSeqNos). - -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, - {MXs, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}}, - Nack) -> - State1 = case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> UQM1 = gb_trees:delete(QPid, UQM), - State#ch{unconfirmed_qm = UQM1}; - false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State#ch{unconfirmed_qm = UQM1} - end; - none -> - State - end, - Qs1 = gb_sets:del_element(QPid, Qs), - %% If QPid somehow died initiating a nack, clear the message from - %% internal data-structures. Also, cleanup empty entries. - case (Nack orelse gb_sets:is_empty(Qs1)) of - true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ), - {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}}; - false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), - {MXs, State1#ch{unconfirmed_mq = UMQ1}} - end. +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1152,22 +1115,12 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> true -> State end. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> - MsgSeqNos = case gb_trees:lookup(QPid, UQM) of - {value, MsgSet} -> gb_sets:to_list(MsgSet); - none -> [] - end, - %% We remove the MsgSeqNos from UQM before calling - %% process_confirms to prevent each MsgSeqNo being removed from - %% the set one by one which which would be inefficient - State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, - {Nack, SendFun} = - case rabbit_misc:is_abnormal_termination(Reason) of - true -> {true, fun send_nacks/2}; - false -> {false, fun record_confirms/2} - end, - {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), - SendFun(MXs, State2). +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(QPid, UC), + (case rabbit_misc:is_abnormal_termination(Reason) of + true -> fun send_nacks/2; + false -> fun record_confirms/2 + end)(MXs, State#ch{unconfirmed = UC1}). handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, @@ -1392,30 +1345,16 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed_mq = UMQ} = State, - UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), - SingletonSet = gb_sets:singleton(MsgSeqNo), - lists:foldl( - fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) -> - case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State0#ch{unconfirmed_qm = UQM1}; - none -> - UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), - State0#ch{unconfirmed_qm = UQM1} - end - end, State#ch{unconfirmed_mq = UMQ1}, QPids). + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, + State#ch.unconfirmed)}. send_nacks([], State) -> State; send_nacks(MXs, State = #ch{tx_status = none}) -> - MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ], - coalesce_and_send(MsgSeqNos, + coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, - multiple = Multiple} + multiple = Multiple} end, State); send_nacks(_, State) -> maybe_complete_tx(State#ch{tx_status = failed}). @@ -1445,11 +1384,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UMQ) of + CutOff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo + false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1463,8 +1402,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, maybe_complete_tx(State = #ch{tx_status = in_progress}) -> State; -maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) of +maybe_complete_tx(State = #ch{unconfirmed = UC}) -> + case dtree:is_empty(UC) of false -> State; true -> complete_tx(State#ch{confirmed = []}) end. @@ -1492,8 +1431,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> - gb_trees:size(UMQ); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> + dtree:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6a775adf..51f88c8f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -328,7 +328,7 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> 0 -> Arg ++ ".0"; _ -> Arg end), - Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]), + Inform("Setting memory threshold on ~p to ~p", [Node, Frac]), rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl new file mode 100644 index 00000000..831d5290 --- /dev/null +++ b/src/rabbit_disk_monitor.erl @@ -0,0 +1,196 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_disk_monitor). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0, + set_check_interval/1, get_disk_free/0]). + +-define(SERVER, ?MODULE). +-define(DEFAULT_DISK_CHECK_INTERVAL, 60000). + +-record(state, {dir, + limit, + timeout, + timer, + alarmed + }). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(disk_free_limit() :: (integer() | {'mem_relative', float()})). +-spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()). +-spec(get_disk_free_limit/0 :: () -> integer()). +-spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok'). +-spec(get_check_interval/0 :: () -> integer()). +-spec(set_check_interval/1 :: (integer()) -> 'ok'). +-spec(get_disk_free/0 :: () -> (integer() | 'unknown')). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +get_disk_free_limit() -> + gen_server:call(?MODULE, get_disk_free_limit, infinity). + +set_disk_free_limit(Limit) -> + gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity). + +get_check_interval() -> + gen_server:call(?MODULE, get_check_interval, infinity). + +set_check_interval(Interval) -> + gen_server:call(?MODULE, {set_check_interval, Interval}, infinity). + +get_disk_free() -> + gen_server:call(?MODULE, get_disk_free, infinity). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +init([Limit]) -> + TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL), + Dir = dir(), + State = #state { dir = Dir, + timeout = ?DEFAULT_DISK_CHECK_INTERVAL, + timer = TRef, + alarmed = false}, + case {catch get_disk_free(Dir), + vm_memory_monitor:get_total_memory()} of + {N1, N2} when is_integer(N1), is_integer(N2) -> + {ok, set_disk_limits(State, Limit)}; + _ -> + rabbit_log:info("Disabling disk free space monitoring " + "on unsupported platform~n"), + {stop, unsupported_platform} + end. + +handle_call(get_disk_free_limit, _From, State) -> + {reply, interpret_limit(State#state.limit), State}; + +handle_call({set_disk_free_limit, Limit}, _From, State) -> + {reply, ok, set_disk_limits(State, Limit)}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_disk_free, _From, State = #state { dir = Dir }) -> + {reply, get_disk_free(Dir), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(update, State) -> + {noreply, internal_update(State)}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% Server Internals +%%---------------------------------------------------------------------------- + +% the partition / drive containing this directory will be monitored +dir() -> rabbit_mnesia:dir(). + +set_disk_limits(State, Limit) -> + State1 = State#state { limit = Limit }, + rabbit_log:info("Disk free limit set to ~pMB~n", + [trunc(interpret_limit(Limit) / 1048576)]), + internal_update(State1). + +internal_update(State = #state { limit = Limit, + dir = Dir, + alarmed = Alarmed}) -> + CurrentFreeBytes = get_disk_free(Dir), + LimitBytes = interpret_limit(Limit), + NewAlarmed = CurrentFreeBytes < LimitBytes, + case {Alarmed, NewAlarmed} of + {false, true} -> + emit_update_info("exceeded", CurrentFreeBytes, LimitBytes), + alarm_handler:set_alarm({{resource_limit, disk, node()}, []}); + {true, false} -> + emit_update_info("below limit", CurrentFreeBytes, LimitBytes), + alarm_handler:clear_alarm({resource_limit, disk, node()}); + _ -> + ok + end, + State #state {alarmed = NewAlarmed}. + +get_disk_free(Dir) -> + get_disk_free(Dir, os:type()). + +get_disk_free(Dir, {unix, Sun}) + when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> + parse_free_unix(rabbit_misc:os_cmd("/usr/bin/df -k " ++ Dir)); +get_disk_free(Dir, {unix, _}) -> + parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); +get_disk_free(Dir, {win32, _}) -> + parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir)); +get_disk_free(_, _) -> + unknown. + +parse_free_unix(CommandResult) -> + [_, Stats | _] = string:tokens(CommandResult, "\n"), + [_FS, _Total, _Used, Free | _] = string:tokens(Stats, " \t"), + list_to_integer(Free) * 1024. + +parse_free_win32(CommandResult) -> + LastLine = lists:last(string:tokens(CommandResult, "\r\n")), + [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "), + list_to_integer(Free). + +interpret_limit({mem_relative, R}) -> + round(R * vm_memory_monitor:get_total_memory()); +interpret_limit(L) -> + L. + +emit_update_info(State, CurrentFree, Limit) -> + rabbit_log:info( + "Disk free space limit now ~s. Free bytes:~p Limit:~p~n", + [State, CurrentFree, Limit]). + +start_timer(Timeout) -> + {ok, TRef} = timer:send_interval(Timeout, update), + TRef. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ddf7f574..c1be7613 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,8 +46,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([dict_cons/3, orddict_cons/3, gb_trees_cons/3, - gb_trees_set_insert/3]). +-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). @@ -60,6 +59,7 @@ -export([append_rpc_all_nodes/4]). -export([multi_call/2]). -export([quit/1]). +-export([os_cmd/1]). %%---------------------------------------------------------------------------- @@ -176,7 +176,6 @@ -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). --spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()). -spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). -spec(gb_trees_foreach/2 :: (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). @@ -204,6 +203,7 @@ -spec(multi_call/2 :: ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(quit/1 :: (integer() | string()) -> no_return()). +-spec(os_cmd/1 :: (string()) -> string()). -endif. @@ -717,15 +717,6 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. -gb_trees_set_insert(Key, Value, Tree) -> - case gb_trees:lookup(Key, Tree) of - {value, Values} -> - Values1 = gb_sets:insert(Value, Values), - gb_trees:update(Key, Values1, Tree); - none -> - gb_trees:insert(Key, gb_sets:singleton(Value), Tree) - end. - gb_trees_fold(Fun, Acc, Tree) -> gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). @@ -914,3 +905,10 @@ quit(Status) -> {unix, _} -> halt(Status); {win32, _} -> init:stop(Status) end. + +os_cmd(Command) -> + Exec = hd(string:tokens(Command, " ")), + case os:find_executable(Exec) of + false -> throw({command_not_found, Exec}); + _ -> os:cmd(Command) + end. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 329c07dc..9a972d9e 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -39,15 +39,15 @@ names(Hostname) -> Self = self(), - process_flag(trap_exit, true), - Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end), + Ref = make_ref(), + {Pid, MRef} = spawn_monitor( + fun () -> Self ! {Ref, net_adm:names(Hostname)} end), timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), - Res = receive - {names, Names} -> Names; - {'EXIT', Pid, Reason} -> {error, Reason} - end, - process_flag(trap_exit, false), - Res. + receive + {Ref, Names} -> erlang:demonitor(MRef, [flush]), + Names; + {'DOWN', MRef, process, Pid, Reason} -> {error, Reason} + end. diagnostics(Nodes) -> Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 47e796dc..5e9e78d3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -25,7 +25,7 @@ -export([init/4, mainloop/2]). --export([conserve_memory/2, server_properties/1]). +-export([conserve_resources/2, server_properties/1]). -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). @@ -38,7 +38,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state, conserve_memory, + auth_mechanism, auth_state, conserve_resources, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, @@ -71,7 +71,7 @@ -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> rabbit_framing:amqp_table()). @@ -133,8 +133,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, +conserve_resources(Pid, Conserve) -> + Pid ! {conserve_resources, Conserve}, ok. server_properties(Protocol) -> @@ -218,7 +218,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf_len = 0, auth_mechanism = none, auth_state = none, - conserve_memory = false, + conserve_resources = false, last_blocked_by = none, last_blocked_at = never}, try @@ -276,8 +276,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {other, Other} -> handle_other(Other, Deb, State) end. -handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve})); +handle_other({conserve_resources, Conserve}, Deb, State) -> + recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -358,8 +358,8 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -control_throttle(State = #v1{connection_state = CS, - conserve_memory = Mem}) -> +control_throttle(State = #v1{connection_state = CS, + conserve_resources = Mem}) -> case {CS, Mem orelse credit_flow:blocked()} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; @@ -377,9 +377,9 @@ maybe_block(State = #v1{connection_state = blocking}) -> maybe_block(State) -> State. -update_last_blocked_by(State = #v1{conserve_memory = true}) -> - State#v1{last_blocked_by = mem}; -update_last_blocked_by(State = #v1{conserve_memory = false}) -> +update_last_blocked_by(State = #v1{conserve_resources = true}) -> + State#v1{last_blocked_by = resource}; +update_last_blocked_by(State = #v1{conserve_resources = false}) -> State#v1{last_blocked_by = flow}. close_connection(State = #v1{queue_collector = Collector, @@ -701,11 +701,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), State1 = control_throttle( - State#v1{connection_state = running, - connection = NewConnection, - conserve_memory = Conserve}), + State#v1{connection_state = running, + connection = NewConnection, + conserve_resources = Conserve}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 0965e3b3..bf2b4798 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -52,22 +52,20 @@ start_child(Mod, Args) -> start_child(Mod, Mod, Args). start_child(ChildId, Mod, Args) -> - {ok, _} = supervisor:start_child(?SERVER, - {ChildId, {Mod, start_link, Args}, - transient, ?MAX_WAIT, worker, [Mod]}), - ok. + child_reply(supervisor:start_child(?SERVER, + {ChildId, {Mod, start_link, Args}, + transient, ?MAX_WAIT, worker, [Mod]})). start_restartable_child(Mod) -> start_restartable_child(Mod, []). start_restartable_child(Mod, Args) -> Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), - {ok, _} = supervisor:start_child( - ?SERVER, - {Name, {rabbit_restartable_sup, start_link, - [Name, {Mod, start_link, Args}]}, - transient, infinity, supervisor, [rabbit_restartable_sup]}), - ok. + child_reply(supervisor:start_child( + ?SERVER, + {Name, {rabbit_restartable_sup, start_link, + [Name, {Mod, start_link, Args}]}, + transient, infinity, supervisor, [rabbit_restartable_sup]})). stop_child(ChildId) -> case supervisor:terminate_child(?SERVER, ChildId) of @@ -77,3 +75,9 @@ stop_child(ChildId) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. + + +%%---------------------------------------------------------------------------- + +child_reply({ok, _}) -> ok; +child_reply(X) -> X. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 85fe5426..55e4a6f8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2551,7 +2551,7 @@ test_queue_recover() -> {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), - rabbit_amqqueue:internal_delete(QName) + rabbit_amqqueue:internal_delete(QName, QPid1) end), passed. diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index e36ca437..e1a7bcae 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -28,7 +28,9 @@ -ifdef(use_specs). -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). --spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade_local/0 :: () -> 'ok' | + 'version_not_available' | + 'starting_from_scratch'). -endif. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index dc74b2f5..f3a8cacf 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -24,7 +24,7 @@ send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --record(wstate, {sock, channel, frame_max, protocol}). +-record(wstate, {sock, channel, frame_max, protocol, pending}). -define(HIBERNATE_AFTER, 5000). @@ -80,7 +80,8 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, - protocol = Protocol}])}. + protocol = Protocol, + pending = []}])}. start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> {ok, @@ -88,7 +89,8 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, - protocol = Protocol}])}. + protocol = Protocol, + pending = []}])}. mainloop(ReaderPid, State) -> try @@ -98,37 +100,41 @@ mainloop(ReaderPid, State) -> end, done. -mainloop1(ReaderPid, State) -> +mainloop1(ReaderPid, State = #wstate{pending = []}) -> receive Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) after ?HIBERNATE_AFTER -> erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) + end; +mainloop1(ReaderPid, State) -> + receive + Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + after 0 -> + ?MODULE:mainloop1(ReaderPid, flush(State)) end. handle_message({send_command, MethodRecord}, State) -> - ok = internal_send_command_async(MethodRecord, State), - State; + internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> - ok = internal_send_command_async(MethodRecord, Content, State), - State; + internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - ok = internal_send_command_async(MethodRecord, State), + State1 = flush(internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), - State; + State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - ok = internal_send_command_async(MethodRecord, Content, State), + State1 = flush(internal_send_command_async(MethodRecord, Content, State)), gen_server:reply(From, ok), - State; + State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> - ok = internal_send_command_async(MethodRecord, State), + State1 = internal_send_command_async(MethodRecord, State), rabbit_amqqueue:notify_sent(QPid, ChPid), - State; + State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State) -> - ok = internal_send_command_async(MethodRecord, Content, State), + State1 = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), - State; + State1; handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> rabbit_amqqueue:notify_sent_queue_down(QPid), State; @@ -184,22 +190,6 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. -%% We optimise delivery of small messages. Content-bearing methods -%% require at least three frames. Small messages always fit into -%% that. We hand their frames to the Erlang network functions in one -%% go, which may lead to somewhat more efficient processing in the -%% runtime and a greater chance of coalescing into fewer TCP packets. -%% -%% By contrast, for larger messages, split across many frames, we want -%% to allow interleaving of frames on different channels. Hence we -%% hand them to the Erlang network functions one frame at a time. -send_frames(Fun, Sock, Frames) when length(Frames) =< 3 -> - Fun(Sock, Frames); -send_frames(Fun, Sock, Frames) -> - lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame); - (_Frame, Other) -> Other - end, ok, Frames). - tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). @@ -209,9 +199,44 @@ internal_send_command(Sock, Channel, MethodRecord, Protocol) -> internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, Protocol) -> - ok = send_frames(fun tcp_send/2, Sock, - assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). + ok = lists:foldl(fun (Frame, ok) -> tcp_send(Sock, Frame); + (_Frame, Other) -> Other + end, ok, assemble_frames(Channel, MethodRecord, + Content, FrameMax, Protocol)). + +internal_send_command_async(MethodRecord, + State = #wstate{channel = Channel, + protocol = Protocol, + pending = Pending}) -> + Frame = assemble_frame(Channel, MethodRecord, Protocol), + maybe_flush(State#wstate{pending = [Frame | Pending]}). + +internal_send_command_async(MethodRecord, Content, + State = #wstate{channel = Channel, + frame_max = FrameMax, + protocol = Protocol, + pending = Pending}) -> + Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax, + Protocol), + maybe_flush(State#wstate{pending = [Frames | Pending]}). + +%% This magic number is the tcp-over-ethernet MSS (1460) minus the +%% minimum size of a AMQP basic.deliver method frame (24) plus basic +%% content header (22). The idea is that we want to flush just before +%% exceeding the MSS. +-define(FLUSH_THRESHOLD, 1414). + +maybe_flush(State = #wstate{pending = Pending}) -> + case iolist_size(Pending) >= ?FLUSH_THRESHOLD of + true -> flush(State); + false -> State + end. + +flush(State = #wstate{pending = []}) -> + State; +flush(State = #wstate{sock = Sock, pending = Pending}) -> + ok = port_cmd(Sock, lists:reverse(Pending)), + State#wstate{pending = []}. %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -231,21 +256,6 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(MethodRecord, - #wstate{sock = Sock, - channel = Channel, - protocol = Protocol}) -> - ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)). - -internal_send_command_async(MethodRecord, Content, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = send_frames(fun port_cmd/2, Sock, - assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). - port_cmd(Sock, Data) -> true = try rabbit_net:port_command(Sock, Data) catch error:Error -> exit({writer, send_failed, Error}) diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 774b7007..8dd8aba8 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -31,6 +31,16 @@ %% the MaxT and MaxR parameters to permit the child to be %% restarted. This may require waiting for longer than Delay. %% +%% Sometimes, you may wish for a transient or intrinsic child to +%% exit abnormally so that it gets restarted, but still log +%% nothing. gen_server will log any exit reason other than +%% 'normal', 'shutdown' or {'shutdown', _}. Thus the exit reason of +%% {'shutdown', 'restart'} is interpreted to mean you wish the +%% child to be restarted according to the delay parameters, but +%% gen_server will not log the error. Thus from gen_server's +%% perspective it's a normal exit, whilst from supervisor's +%% perspective, it's an abnormal exit. +%% %% 4) Added an 'intrinsic' restart type. Like the transient type, this %% type means the child should only be restarted if the child exits %% abnormally. Unlike the transient type, if the child exits @@ -536,6 +546,9 @@ do_restart(permanent, Reason, Child, State) -> restart(Child, State); do_restart(Type, normal, Child, State) -> del_child_and_maybe_shutdown(Type, Child, State); +do_restart({RestartType, Delay}, {shutdown, restart} = Reason, Child, State) + when RestartType =:= transient orelse RestartType =:= intrinsic -> + do_restart_delay({RestartType, Delay}, Reason, Child, State); do_restart(Type, {shutdown, _}, Child, State) -> del_child_and_maybe_shutdown(Type, Child, State); do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) -> diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index fca55f02..fb184d1a 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -181,10 +181,10 @@ internal_update(State = #state { memory_limit = MemLimit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info(set, MemUsed, MemLimit), - alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []}); + alarm_handler:set_alarm({{resource_limit, memory, node()}, []}); {true, false} -> emit_update_info(clear, MemUsed, MemLimit), - alarm_handler:clear_alarm({vm_memory_high_watermark, node()}); + alarm_handler:clear_alarm({resource_limit, memory, node()}); _ -> ok end, |