diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-05 16:24:11 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-05 16:24:11 +0000 |
commit | 3bd7a5d1847804e944bd92af921a3bfff4906fcb (patch) | |
tree | 73d47c777e929c4b8476e3818f771dbb1be7a886 | |
parent | aa18879dfc717a8f0685ffca661ef96e7f51bde8 (diff) | |
parent | b4dba4367d50d5b46a08255d486d05f0ede0a1c1 (diff) | |
download | rabbitmq-server-3bd7a5d1847804e944bd92af921a3bfff4906fcb.tar.gz |
merge bug25980 into default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 17 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 84 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
-rw-r--r-- | src/rabbit_channel_interceptor.erl | 11 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 5 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 25 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 1 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 15 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 10 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 12 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 19 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters.erl | 64 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 69 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 27 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 208 |
20 files changed, 383 insertions, 214 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d19acd00..a7e42503 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -502,6 +502,23 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>set_cluster_name</command> <arg choice="req">name</arg></cmdsynopsis></term> + <listitem> + <para> + Sets the cluster name. The cluster name is announced to + clients on connection, and used by the federation and + shovel plugins to record where a message has been. The + cluster name is by default derived from the hostname of + the first node in the cluster, but can be changed. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_cluster_name london</screen> + <para role="example"> + This sets the cluster name to "london". + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 6d117e3d..6f6f4244 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -60,7 +60,7 @@ -record(trie_node, {exchange_name, node_id}). -record(trie_edge, {exchange_name, node_id, word}). --record(trie_binding, {exchange_name, node_id, destination}). +-record(trie_binding, {exchange_name, node_id, destination, arguments}). -record(listener, {node, protocol, host, ip_address, port}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index da8c0607..32b1a2e0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -22,6 +22,7 @@ -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster -export([start_link/1, info_keys/0]). @@ -327,10 +328,13 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. -next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +next_state(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> assert_invariant(State), {MsgIds, BQS1} = BQ:drain_confirmed(BQS), - State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}), + MTC1 = confirm_messages(MsgIds, MTC), + State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}, case BQ:needs_timeout(BQS1) of false -> {stop_sync_timer(State1), hibernate }; idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; @@ -411,9 +415,9 @@ maybe_send_drained(WasEmpty, State) -> end, State. -confirm_messages([], State) -> - State; -confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> +confirm_messages([], MTC) -> + MTC; +confirm_messages(MsgIds, MTC) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> @@ -427,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> end end, {gb_trees:empty(), MTC}, MsgIds), rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), - State#q{msg_id_to_channel = MTC1}. + MTC1. send_or_record_confirm(#delivery{confirm = false}, State) -> {never, State}; @@ -447,23 +451,22 @@ send_or_record_confirm(#delivery{confirm = true, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. -send_mandatory(#delivery{mandatory = false}) -> +send_mandatory(#delivery{mandatory = false}) -> ok; send_mandatory(#delivery{mandatory = true, sender = SenderPid, msg_seq_no = MsgSeqNo}) -> gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). -discard(#delivery{sender = SenderPid, - msg_seq_no = MsgSeqNo, - message = #basic_message{id = MsgId}}, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case MsgSeqNo of - undefined -> State; - _ -> confirm_messages([MsgId], State) - end, +discard(#delivery{confirm = Confirm, + sender = SenderPid, + message = #basic_message{id = MsgId}}, BQ, BQS, MTC) -> + MTC1 = case Confirm of + true -> confirm_messages([MsgId], MTC); + false -> MTC + end, BQS1 = BQ:discard(MsgId, SenderPid, BQS), - State1#q{backing_queue_state = BQS1}. + {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -486,20 +489,22 @@ run_message_queue(ActiveConsumersChanged, State) -> attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> case rabbit_queue_consumers:deliver( fun (true) -> true = BQ:is_empty(BQS), {AckTag, BQS1} = BQ:publish_delivered( Message, Props, SenderPid, BQS), - {{Message, Delivered, AckTag}, - State#q{backing_queue_state = BQS1}}; + {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, - discard(Delivery, State)} + discard(Delivery, BQ, BQS, MTC)} end, qname(State), State#q.consumers) of - {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} -> {delivered, maybe_notify_decorators( ActiveConsumersChanged, - State1#q{consumers = Consumers})}; + State#q{backing_queue_state = BQS1, + msg_id_to_channel = MTC1, + consumers = Consumers})}; {undelivered, ActiveConsumersChanged, Consumers} -> {undelivered, maybe_notify_decorators( ActiveConsumersChanged, @@ -511,7 +516,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms {Confirm, State1} = send_or_record_confirm(Delivery, State), - Props = message_properties(Message, Confirm, State), + Props = message_properties(Message, Confirm, State1), {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State2 = State1#q{backing_queue_state = BQS1}, case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, @@ -521,8 +526,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {delivered, State3} -> State3; %% The next one is an optimisation - {undelivered, State3 = #q{ttl = 0, dlx = undefined}} -> - discard(Delivery, State3); + {undelivered, State3 = #q{ttl = 0, dlx = undefined, + backing_queue_state = BQS2, + msg_id_to_channel = MTC}} -> + {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), + State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; {undelivered, State3 = #q{backing_queue_state = BQS2}} -> BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), {Dropped, State4 = #q{backing_queue_state = BQS4}} = @@ -833,24 +841,36 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _Len, _State) -> +prioritise_call(Msg, _From, _Len, State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - stat -> 7; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + stat -> 7; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State); + {basic_cancel, _, _, _} -> consumer_bias(State); + _ -> 0 end. -prioritise_cast(Msg, _Len, _State) -> +prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; + {ack, _AckTags, _ChPid} -> consumer_bias(State); + {notify_sent, _ChPid, _Credit} -> consumer_bias(State); + {resume, _ChPid} -> consumer_bias(State); _ -> 0 end. +consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) -> + case BQ:msg_rates(BQS) of + {0.0, _} -> 0; + {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> 1; + {_, _} -> 0 + end. + prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2b561900..3d88be7a 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -209,6 +209,10 @@ %% Called immediately before the queue hibernates. -callback handle_pre_hibernate(state()) -> state(). +%% Used to help prioritisation in rabbit_amqqueue_process. The rate of +%% inbound messages and outbound messages at the moment. +-callback msg_rates(state()) -> {float(), float()}. + %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status -callback status(state()) -> [{atom(), any()}]. @@ -236,7 +240,8 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; + {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1}, + {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4d866908..6b0f3700 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1439,8 +1439,9 @@ notify_limiter(Limiter, Acked) -> end end. -deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, - mandatory = false}, +deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, + confirm = false, + mandatory = false}, []}, State) -> %% optimisation ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), State; diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 2bd22579..49f7e388 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -51,8 +51,11 @@ behaviour_info(_Other) -> %%---------------------------------------------------------------------------- -intercept_method(#'basic.publish'{} = M, _VHost) -> - M; +intercept_method(#'basic.publish'{} = M, _VHost) -> M; +intercept_method(#'basic.ack'{} = M, _VHost) -> M; +intercept_method(#'basic.nack'{} = M, _VHost) -> M; +intercept_method(#'basic.reject'{} = M, _VHost) -> M; +intercept_method(#'basic.credit'{} = M, _VHost) -> M; intercept_method(M, VHost) -> intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))). @@ -87,5 +90,7 @@ select(Method) -> validate_method(M, M2) -> rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2). +%% keep dialyzer happy +-spec internal_error(string(), [any()]) -> no_return(). internal_error(Format, Args) -> - rabbit_misc:protocol_error(internal_error, Format, Args).
\ No newline at end of file + rabbit_misc:protocol_error(internal_error, Format, Args). diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index f3463286..746f2bdb 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -90,6 +90,7 @@ status, environment, report, + set_cluster_name, eval, close_connection, @@ -527,6 +528,10 @@ action(report, Node, _Args, _Opts, Inform) -> [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts], ok; +action(set_cluster_name, Node, [Name], _Opts, Inform) -> + Inform("Setting cluster name to ~s", [Name]), + rpc_call(Node, rabbit_nodes, set_cluster_name, [list_to_binary(Name)]); + action(eval, Node, [Expr], _Opts, _Inform) -> case erl_scan:string(Expr) of {ok, Scanned, _} -> diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 8ba29deb..27b8d1e6 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -79,9 +79,9 @@ remove_bindings(transaction, _X, Bs) -> [begin Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), - trie_remove_binding(X, FinalNode, D), + trie_remove_binding(X, FinalNode, D, Args), remove_path_if_empty(X, Path) - end || #binding{source = X, key = K, destination = D} <- Bs], + end || #binding{source = X, key = K, destination = D, args = Args} <- Bs], ok; remove_bindings(none, _X, _Bs) -> ok. @@ -91,9 +91,10 @@ assert_args_equivalence(X, Args) -> %%---------------------------------------------------------------------------- -internal_add_binding(#binding{source = X, key = K, destination = D}) -> +internal_add_binding(#binding{source = X, key = K, destination = D, + args = Args}) -> FinalNode = follow_down_create(X, split_topic_key(K)), - trie_add_binding(X, FinalNode, D), + trie_add_binding(X, FinalNode, D, Args), ok. trie_match(X, Words) -> @@ -176,7 +177,8 @@ trie_bindings(X, Node) -> MatchHead = #topic_trie_binding{ trie_binding = #trie_binding{exchange_name = X, node_id = Node, - destination = '$1'}}, + destination = '$1', + arguments = '_'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). trie_update_node_counts(X, Node, Field, Delta) -> @@ -213,20 +215,21 @@ trie_edge_op(X, FromNode, ToNode, W, Op) -> node_id = ToNode}, write). -trie_add_binding(X, Node, D) -> +trie_add_binding(X, Node, D, Args) -> trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1), - trie_binding_op(X, Node, D, fun mnesia:write/3). + trie_binding_op(X, Node, D, Args, fun mnesia:write/3). -trie_remove_binding(X, Node, D) -> +trie_remove_binding(X, Node, D, Args) -> trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1), - trie_binding_op(X, Node, D, fun mnesia:delete_object/3). + trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3). -trie_binding_op(X, Node, D, Op) -> +trie_binding_op(X, Node, D, Args, Op) -> ok = Op(rabbit_topic_trie_binding, #topic_trie_binding{ trie_binding = #trie_binding{exchange_name = X, node_id = Node, - destination = D}}, + destination = D, + arguments = Args}}, write). trie_remove_all_nodes(X) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9ce5afcb..b272c64f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2]). + msg_rates/1, status/1, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -353,6 +353,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. +msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:msg_rates(BQS). + status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:status(BQS) ++ [ {mirror_seen, dict:size(State #state.seen_status)}, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index da185dce..1f31b5c8 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -115,7 +115,7 @@ handle_go(Q = #amqqueue{name = QName}) -> Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), Q1 = Q #amqqueue { pid = QPid }, - BQS = bq_init(BQ, Q1, []), + BQS = bq_init(BQ, Q1, new), State = #state { q = Q1, gm = GM, backing_queue = BQ, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index a3fd068f..b10f79c5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -70,6 +70,7 @@ -export([ensure_timer/4, stop_timer/2]). -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). +-export([moving_average/4]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -248,6 +249,8 @@ -spec(get_parent/0 :: () -> pid()). -spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). +-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined') + -> float()). -endif. %%---------------------------------------------------------------------------- @@ -1060,6 +1063,12 @@ stop_timer(State, Idx) -> store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). store_proc_name(TypeProcName) -> put(process_name, TypeProcName). +moving_average(_Time, _HalfLife, Next, undefined) -> + Next; +moving_average(Time, HalfLife, Next, Current) -> + Weight = math:exp(Time * math:log(0.5) / HalfLife), + Next * (1 - Weight) + Current * Weight. + %% ------------------------------------------------------------------------- %% Begin copypasta from gen_server2.erl diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f27f77c6..59873ffc 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -327,6 +327,7 @@ status() -> case is_running() of true -> RunningNodes = cluster_nodes(running), [{running_nodes, RunningNodes}, + {cluster_name, rabbit_nodes:cluster_name()}, {partitions, mnesia_partitions(RunningNodes)}]; false -> [] end. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 5a1613a7..c5aa8473 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -17,7 +17,8 @@ -module(rabbit_nodes). -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, - is_running/2, is_process_running/2, fqdn_nodename/0]). + is_running/2, is_process_running/2, + cluster_name/0, set_cluster_name/1]). -include_lib("kernel/include/inet.hrl"). @@ -37,7 +38,8 @@ -spec(cookie_hash/0 :: () -> string()). -spec(is_running/2 :: (node(), atom()) -> boolean()). -spec(is_process_running/2 :: (node(), atom()) -> boolean()). --spec(fqdn_nodename/0 :: () -> binary()). +-spec(cluster_name/0 :: () -> binary()). +-spec(set_cluster_name/1 :: (binary()) -> 'ok'). -endif. @@ -111,8 +113,15 @@ is_process_running(Node, Process) -> P when is_pid(P) -> true end. -fqdn_nodename() -> +cluster_name() -> + rabbit_runtime_parameters:value_global( + cluster_name, cluster_name_default()). + +cluster_name_default() -> {ID, _} = rabbit_nodes:parts(node()), {ok, Host} = inet:gethostname(), {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). + +set_cluster_name(Name) -> + rabbit_runtime_parameters:set_global(cluster_name, Name). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index bea7e0d0..c9540da8 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -27,6 +27,9 @@ -define(UNSENT_MESSAGE_LIMIT, 200). +%% Utilisation average calculations are all in μs. +-define(USE_AVG_HALF_LIFE, 1000000.0). + -record(state, {consumers, use}). -record(consumer, {tag, ack_required, args}). @@ -430,11 +433,6 @@ update_use({inactive, Since, Active, Avg}, active) -> use_avg(Active, Inactive, Avg) -> Time = Inactive + Active, - Ratio = Active / Time, - Weight = erlang:min(1, Time / 1000000), - case Avg of - undefined -> Ratio; - _ -> Ratio * Weight + Avg * (1 - Weight) - end. + rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg). now_micros() -> timer:now_diff(now(), {0,0,0}). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index b5a316f0..919b7376 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -358,9 +358,10 @@ start(DurableQueueNames) -> %% Any queue directory we've not been asked to recover is considered garbage QueuesDir = queues_dir(), - [rabbit_file:recursive_delete([QueueDir]) || - QueueDir <- all_queue_directory_names(QueuesDir), - not sets:is_element(filename:basename(QueueDir), DurableDirectories)], + rabbit_file:recursive_delete( + [filename:join(QueuesDir, DirName) || + DirName <- all_queue_directory_names(QueuesDir), + not sets:is_element(DirName, DurableDirectories)]), rabbit_recovery_terms:clear(), @@ -373,9 +374,8 @@ stop() -> rabbit_recovery_terms:stop(). all_queue_directory_names(Dir) -> case rabbit_file:list_dir(Dir) of - {ok, Entries} -> [ Entry || Entry <- Entries, - rabbit_file:is_dir( - filename:join(Dir, Entry)) ]; + {ok, Entries} -> [E || E <- Entries, + rabbit_file:is_dir(filename:join(Dir, E))]; {error, enoent} -> [] end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 64debcab..47bc99d8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -156,19 +156,23 @@ server_properties(Protocol) -> [case X of {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), longstr, - list_to_binary(Value)}; + maybe_list_to_binary(Value)}; {BinKey, Type, Value} -> {BinKey, Type, Value} end || X <- RawConfigServerProps ++ - [{product, Product}, - {version, Version}, - {platform, "Erlang/OTP"}, - {copyright, ?COPYRIGHT_MESSAGE}, - {information, ?INFORMATION_MESSAGE}]]], + [{product, Product}, + {version, Version}, + {cluster_name, rabbit_nodes:cluster_name()}, + {platform, "Erlang/OTP"}, + {copyright, ?COPYRIGHT_MESSAGE}, + {information, ?INFORMATION_MESSAGE}]]], %% Filter duplicated properties in favour of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). +maybe_list_to_binary(V) when is_binary(V) -> V; +maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V). + server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, @@ -957,6 +961,9 @@ validate_negotiated_integer_value(Field, Min, ClientValue) -> ok end. +%% keep dialyzer happy +-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) -> + no_return(). fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> {S1, S2} = case MinOrMax of min -> {lower, minimum}; diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index bcde0078..18b9fbb8 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -22,6 +22,8 @@ list_component/1, list/2, list_formatted/1, lookup/3, value/3, value/4, info_keys/0]). +-export([set_global/2, value_global/1, value_global/2]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -34,6 +36,7 @@ -> ok_or_error_string()). -spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> ok_or_error_string()). +-spec(set_global/2 :: (atom(), term()) -> 'ok'). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) -> ok_or_error_string()). -spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary()) @@ -48,6 +51,8 @@ -> rabbit_types:infos() | 'not_found'). -spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()). -spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()). +-spec(value_global/1 :: (atom()) -> term() | 'not_found'). +-spec(value_global/2 :: (atom(), term()) -> term()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -endif. @@ -74,6 +79,10 @@ set(_, <<"policy">>, _, _) -> set(VHost, Component, Name, Term) -> set_any(VHost, Component, Name, Term). +set_global(Name, Term) -> + mnesia_update(Name, Term), + ok. + format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. @@ -100,16 +109,22 @@ set_any0(VHost, Component, Name, Term) -> E end. +mnesia_update(Key, Term) -> + rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)). + mnesia_update(VHost, Comp, Name, Term) -> - F = fun () -> - Res = case mnesia:read(?TABLE, {VHost, Comp, Name}, read) of - [] -> new; - [Params] -> {old, Params#runtime_parameters.value} + rabbit_misc:execute_mnesia_transaction( + rabbit_vhost:with(VHost, mnesia_update_fun({VHost, Comp, Name}, Term))). + +mnesia_update_fun(Key, Term) -> + fun () -> + Res = case mnesia:read(?TABLE, Key, read) of + [] -> new; + [Params] -> {old, Params#runtime_parameters.value} end, - ok = mnesia:write(?TABLE, c(VHost, Comp, Name, Term), write), - Res - end, - rabbit_misc:execute_mnesia_transaction(rabbit_vhost:with(VHost, F)). + ok = mnesia:write(?TABLE, c(Key, Term), write), + Res + end. clear(_, <<"policy">> , _) -> {error_string, "policies may not be cleared using this method"}; @@ -159,43 +174,46 @@ list_formatted(VHost) -> [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. lookup(VHost, Component, Name) -> - case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of + case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> p(Params) end. -value(VHost, Component, Name) -> - case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of +value(VHost, Comp, Name) -> value0({VHost, Comp, Name}). +value(VHost, Comp, Name, Def) -> value0({VHost, Comp, Name}, Def). + +value_global(Key) -> value0(Key). +value_global(Key, Default) -> value0(Key, Default). + +value0(Key) -> + case lookup0(Key, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> Params#runtime_parameters.value end. -value(VHost, Component, Name, Default) -> - Params = lookup0(VHost, Component, Name, - fun () -> - lookup_missing(VHost, Component, Name, Default) - end), +value0(Key, Default) -> + Params = lookup0(Key, fun () -> lookup_missing(Key, Default) end), Params#runtime_parameters.value. -lookup0(VHost, Component, Name, DefaultFun) -> - case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of +lookup0(Key, DefaultFun) -> + case mnesia:dirty_read(?TABLE, Key) of [] -> DefaultFun(); [R] -> R end. -lookup_missing(VHost, Component, Name, Default) -> +lookup_missing(Key, Default) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read(?TABLE, {VHost, Component, Name}, read) of - [] -> Record = c(VHost, Component, Name, Default), + case mnesia:read(?TABLE, Key, read) of + [] -> Record = c(Key, Default), mnesia:write(?TABLE, Record, write), Record; [R] -> R end end). -c(VHost, Component, Name, Default) -> - #runtime_parameters{key = {VHost, Component, Name}, +c(Key, Default) -> + #runtime_parameters{key = Key, value = Default}. p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1acff127..767aab40 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -558,33 +558,38 @@ test_topic_matching() -> key = list_to_binary(Key), destination = #resource{virtual_host = <<"/">>, kind = queue, - name = list_to_binary(Q)}} || - {Key, Q} <- [{"a.b.c", "t1"}, - {"a.*.c", "t2"}, - {"a.#.b", "t3"}, - {"a.b.b.c", "t4"}, - {"#", "t5"}, - {"#.#", "t6"}, - {"#.b", "t7"}, - {"*.*", "t8"}, - {"a.*", "t9"}, - {"*.b.c", "t10"}, - {"a.#", "t11"}, - {"a.#.#", "t12"}, - {"b.b.c", "t13"}, - {"a.b.b", "t14"}, - {"a.b", "t15"}, - {"b.c", "t16"}, - {"", "t17"}, - {"*.*.*", "t18"}, - {"vodka.martini", "t19"}, - {"a.b.c", "t20"}, - {"*.#", "t21"}, - {"#.*.#", "t22"}, - {"*.#.#", "t23"}, - {"#.#.#", "t24"}, - {"*", "t25"}, - {"#.b.#", "t26"}]], + name = list_to_binary(Q)}, + args = Args} || + {Key, Q, Args} <- [{"a.b.c", "t1", []}, + {"a.*.c", "t2", []}, + {"a.#.b", "t3", []}, + {"a.b.b.c", "t4", []}, + {"#", "t5", []}, + {"#.#", "t6", []}, + {"#.b", "t7", []}, + {"*.*", "t8", []}, + {"a.*", "t9", []}, + {"*.b.c", "t10", []}, + {"a.#", "t11", []}, + {"a.#.#", "t12", []}, + {"b.b.c", "t13", []}, + {"a.b.b", "t14", []}, + {"a.b", "t15", []}, + {"b.c", "t16", []}, + {"", "t17", []}, + {"*.*.*", "t18", []}, + {"vodka.martini", "t19", []}, + {"a.b.c", "t20", []}, + {"*.#", "t21", []}, + {"#.*.#", "t22", []}, + {"*.#.#", "t23", []}, + {"#.#.#", "t24", []}, + {"*", "t25", []}, + {"#.b.#", "t26", []}, + {"args-test", "t27", + [{<<"foo">>, longstr, <<"bar">>}]}, + {"args-test", "t27", %% Note aliasing + [{<<"foo">>, longstr, <<"baz">>}]}]], lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, Bindings), @@ -611,12 +616,13 @@ test_topic_matching() -> "t22", "t23", "t24", "t26"]}, {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", - "t25"]}]), - + "t25"]}, + {"args-test", ["t5", "t6", "t21", "t22", "t23", "t24", + "t25", "t27"]}]), %% remove some bindings RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), lists:nth(11, Bindings), lists:nth(19, Bindings), - lists:nth(21, Bindings)], + lists:nth(21, Bindings), lists:nth(28, Bindings)], exchange_op_callback(X, remove_bindings, [RemovedBindings]), RemainingBindings = ordsets:to_list( ordsets:subtract(ordsets:from_list(Bindings), @@ -639,7 +645,8 @@ test_topic_matching() -> {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", "t24", "t26"]}, {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, - {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}, + {"args-test", ["t6", "t22", "t23", "t24", "t25", "t27"]}]), %% remove the entire exchange exchange_op_callback(X, delete, [RemainingBindings]), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 90372461..4cb3cacc 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -47,6 +47,7 @@ -rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). +-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). %% ------------------------------------------------------------------- @@ -355,6 +356,32 @@ internal_system_x() -> [name, type, durable, auto_delete, internal, arguments, scratches, policy, decorators]). +cluster_name() -> + {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0), + ok. + +cluster_name_tx() -> + %% mnesia:transform_table/4 does not let us delete records + T = rabbit_runtime_parameters, + mnesia:write_lock_table(T), + Ks = [K || {_VHost, <<"federation">>, <<"local-nodename">>} = K + <- mnesia:all_keys(T)], + case Ks of + [] -> ok; + [K|Tl] -> [{runtime_parameters, _K, Name}] = mnesia:read(T, K, write), + R = {runtime_parameters, cluster_name, Name}, + mnesia:write(T, R, write), + case Tl of + [] -> ok; + _ -> {VHost, _, _} = K, + error_logger:warning_msg( + "Multiple local-nodenames found, picking '~s' " + "from '~s' for cluster name~n", [Name, VHost]) + end + end, + [mnesia:delete(T, K, write) || K <- Ks], + ok. + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8711d139..9d242316 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -21,8 +21,8 @@ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0]). + needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1, + status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -277,11 +277,10 @@ unconfirmed, confirmed, ack_out_counter, - ack_in_counter, - ack_rates + ack_in_counter }). --record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). +-record(rates, { in, out, ack_in, ack_out, timestamp }). -record(msg_status, { seq_id, @@ -322,11 +321,11 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, - ingress :: {timestamp(), non_neg_integer()}, - avg_egress :: float(), - avg_ingress :: float(), - timestamp :: timestamp() }). +-type(rates() :: #rates { in :: float(), + out :: float(), + ack_in :: float(), + ack_out :: float(), + timestamp :: timestamp()}). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), count :: non_neg_integer(), @@ -368,8 +367,7 @@ unconfirmed :: gb_set(), confirmed :: gb_set(), ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer(), - ack_rates :: rates() }). + ack_in_counter :: non_neg_integer() }). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -384,6 +382,18 @@ count = 0, end_seq_id = Z }). +-define(MICROS_PER_SECOND, 1000000.0). + +%% We're sampling every 5s for RAM duration; a half life that is of +%% the same order of magnitude is probably about right. +-define(RATE_AVG_HALF_LIFE, 5.0). + +%% We will recalculate the #rates{} every time we get asked for our +%% RAM duration, or every N messages published, whichever is +%% sooner. We do this since the priority calculations in +%% rabbit_amqqueue_process need fairly fresh rates. +-define(MSGS_PER_RATE_CALC, 100). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -540,14 +550,18 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, - PCount1 = PCount + one_if(IsPersistent1), + InCount1 = InCount + 1, + PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - a(reduce_memory_use( - inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }))). + State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount1, + persistent_count = PCount1, + unconfirmed = UC1 }), + a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of + true -> update_rates(State3); + false -> State3 + end)). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -629,6 +643,31 @@ drop(AckRequired, State) -> ack([], State) -> {[], State}; +%% optimisation: this head is essentially a partial evaluation of the +%% general case below, for the single-ack case. +ack([SeqId], State) -> + {#msg_status { msg_id = MsgId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount, + ack_out_counter = AckOutCount }} = + remove_pending_ack(SeqId, State), + IndexState1 = case IndexOnDisk of + true -> rabbit_queue_index:ack([SeqId], IndexState); + false -> IndexState + end, + case MsgOnDisk of + true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); + false -> ok + end, + PCount1 = PCount - one_if(IsPersistent), + {[MsgId], + a(State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + 1 })}; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, @@ -696,10 +735,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> set_ram_duration_target( DurationTarget, State = #vqstate { - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate }, + rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }, target_ram_count = TargetRamCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, @@ -716,29 +755,43 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end). -ram_duration(State = #vqstate { - rates = #rates { timestamp = Timestamp, - egress = Egress, - ingress = Ingress } = Rates, - ack_rates = #rates { timestamp = AckTimestamp, - egress = AckEgress, - ingress = AckIngress } = ARates, - in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, - ack_out_counter = AckOutCount, - ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev, - ram_pending_ack = RPA, - ram_ack_count_prev = RamAckCountPrev }) -> - Now = now(), - {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), - {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - - {AvgAckEgressRate, AckEgress1} = - update_rate(Now, AckTimestamp, AckOutCount, AckEgress), - {AvgAckIngressRate, AckIngress1} = - update_rate(Now, AckTimestamp, AckInCount, AckIngress), +update_rates(State = #vqstate{ in_counter = InCount, + out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount, + rates = #rates{ in = InRate, + out = OutRate, + ack_in = AckInRate, + ack_out = AckOutRate, + timestamp = TS }}) -> + Now = erlang:now(), + + Rates = #rates { in = update_rate(Now, TS, InCount, InRate), + out = update_rate(Now, TS, OutCount, OutRate), + ack_in = update_rate(Now, TS, AckInCount, AckInRate), + ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), + timestamp = Now }, + + State#vqstate{ in_counter = 0, + out_counter = 0, + ack_in_counter = 0, + ack_out_counter = 0, + rates = Rates }. + +update_rate(Now, TS, Count, Rate) -> + Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND, + rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate). + +ram_duration(State) -> + State1 = #vqstate { rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }, + ram_msg_count = RamMsgCount, + ram_msg_count_prev = RamMsgCountPrev, + ram_pending_ack = RPA, + ram_ack_count_prev = RamAckCountPrev } = + update_rates(State), RamAckCount = gb_trees:size(RPA), @@ -752,25 +805,7 @@ ram_duration(State = #vqstate { AvgAckEgressRate + AvgAckIngressRate)) end, - {Duration, State #vqstate { - rates = Rates #rates { - egress = Egress1, - ingress = Ingress1, - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate, - timestamp = Now }, - ack_rates = ARates #rates { - egress = AckEgress1, - ingress = AckIngress1, - avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate, - timestamp = Now }, - in_counter = 0, - out_counter = 0, - ack_in_counter = 0, - ack_out_counter = 0, - ram_msg_count_prev = RamMsgCount, - ram_ack_count_prev = RamAckCount }}. + {Duration, State1}. needs_timeout(State = #vqstate { index_state = IndexState, target_ram_count = TargetRamCount }) -> @@ -796,6 +831,10 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. +msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, + out = AvgEgressRate } }) -> + {AvgIngressRate, AvgEgressRate}. + status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, @@ -805,10 +844,11 @@ status(#vqstate { ram_msg_count = RamMsgCount, next_seq_id = NextSeqId, persistent_count = PersistentCount, - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate } }) -> + rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }}) -> + [ {q1 , ?QUEUE:len(Q1)}, {q2 , ?QUEUE:len(Q2)}, {delta , Delta}, @@ -998,10 +1038,6 @@ expand_delta(SeqId, #delta { count = Count, expand_delta(_SeqId, #delta { count = Count } = Delta) -> d(Delta #delta { count = Count + 1 }). -update_rate(Now, Then, Count, {OThen, OCount}) -> - %% avg over the current period and the previous - {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}. - %%---------------------------------------------------------------------------- %% Internal major helpers for Public API %%---------------------------------------------------------------------------- @@ -1046,22 +1082,21 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_ack_count_prev = 0, out_counter = 0, in_counter = 0, - rates = blank_rate(Now, DeltaCount1), + rates = blank_rates(Now), msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), confirmed = gb_sets:new(), ack_out_counter = 0, - ack_in_counter = 0, - ack_rates = blank_rate(Now, 0) }, + ack_in_counter = 0 }, a(maybe_deltas_to_betas(State)). -blank_rate(Timestamp, IngressLength) -> - #rates { egress = {Timestamp, 0}, - ingress = {Timestamp, IngressLength}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Timestamp }. +blank_rates(Now) -> + #rates { in = 0.0, + out = 0.0, + ack_in = 0.0, + ack_out = 0.0, + timestamp = Now}. in_r(MsgStatus = #msg_status { msg = undefined }, State = #vqstate { q3 = Q3, q4 = Q4 }) -> @@ -1535,11 +1570,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, - rates = #rates { avg_ingress = AvgIngress, - avg_egress = AvgEgress }, - ack_rates = #rates { avg_ingress = AvgAckIngress, - avg_egress = AvgAckEgress } - }) -> + rates = #rates { in = AvgIngress, + out = AvgEgress, + ack_in = AvgAckIngress, + ack_out = AvgAckEgress } }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of |