summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-02-05 16:24:11 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-02-05 16:24:11 +0000
commit3bd7a5d1847804e944bd92af921a3bfff4906fcb (patch)
tree73d47c777e929c4b8476e3818f771dbb1be7a886
parentaa18879dfc717a8f0685ffca661ef96e7f51bde8 (diff)
parentb4dba4367d50d5b46a08255d486d05f0ede0a1c1 (diff)
downloadrabbitmq-server-3bd7a5d1847804e944bd92af921a3bfff4906fcb.tar.gz
merge bug25980 into default
-rw-r--r--docs/rabbitmqctl.1.xml17
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue_process.erl84
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_channel_interceptor.erl11
-rw-r--r--src/rabbit_control_main.erl5
-rw-r--r--src/rabbit_exchange_type_topic.erl25
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_mnesia.erl1
-rw-r--r--src/rabbit_nodes.erl15
-rw-r--r--src/rabbit_queue_consumers.erl10
-rw-r--r--src/rabbit_queue_index.erl12
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_runtime_parameters.erl64
-rw-r--r--src/rabbit_tests.erl69
-rw-r--r--src/rabbit_upgrade_functions.erl27
-rw-r--r--src/rabbit_variable_queue.erl208
20 files changed, 383 insertions, 214 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d19acd00..a7e42503 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -502,6 +502,23 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_cluster_name</command> <arg choice="req">name</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Sets the cluster name. The cluster name is announced to
+ clients on connection, and used by the federation and
+ shovel plugins to record where a message has been. The
+ cluster name is by default derived from the hostname of
+ the first node in the cluster, but can be changed.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_cluster_name london</screen>
+ <para role="example">
+ This sets the cluster name to "london".
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 6d117e3d..6f6f4244 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -60,7 +60,7 @@
-record(trie_node, {exchange_name, node_id}).
-record(trie_edge, {exchange_name, node_id, word}).
--record(trie_binding, {exchange_name, node_id, destination}).
+-record(trie_binding, {exchange_name, node_id, destination, arguments}).
-record(listener, {node, protocol, host, ip_address, port}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index da8c0607..32b1a2e0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -22,6 +22,7 @@
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
-export([start_link/1, info_keys/0]).
@@ -327,10 +328,13 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
-next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+next_state(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}),
+ MTC1 = confirm_messages(MsgIds, MTC),
+ State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1},
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -411,9 +415,9 @@ maybe_send_drained(WasEmpty, State) ->
end,
State.
-confirm_messages([], State) ->
- State;
-confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
+confirm_messages([], MTC) ->
+ MTC;
+confirm_messages(MsgIds, MTC) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
@@ -427,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
end
end, {gb_trees:empty(), MTC}, MsgIds),
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
- State#q{msg_id_to_channel = MTC1}.
+ MTC1.
send_or_record_confirm(#delivery{confirm = false}, State) ->
{never, State};
@@ -447,23 +451,22 @@ send_or_record_confirm(#delivery{confirm = true,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
-send_mandatory(#delivery{mandatory = false}) ->
+send_mandatory(#delivery{mandatory = false}) ->
ok;
send_mandatory(#delivery{mandatory = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo}) ->
gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
-discard(#delivery{sender = SenderPid,
- msg_seq_no = MsgSeqNo,
- message = #basic_message{id = MsgId}}, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- case MsgSeqNo of
- undefined -> State;
- _ -> confirm_messages([MsgId], State)
- end,
+discard(#delivery{confirm = Confirm,
+ sender = SenderPid,
+ message = #basic_message{id = MsgId}}, BQ, BQS, MTC) ->
+ MTC1 = case Confirm of
+ true -> confirm_messages([MsgId], MTC);
+ false -> MTC
+ end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
- State1#q{backing_queue_state = BQS1}.
+ {BQS1, MTC1}.
run_message_queue(State) -> run_message_queue(false, State).
@@ -486,20 +489,22 @@ run_message_queue(ActiveConsumersChanged, State) ->
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
case rabbit_queue_consumers:deliver(
fun (true) -> true = BQ:is_empty(BQS),
{AckTag, BQS1} = BQ:publish_delivered(
Message, Props, SenderPid, BQS),
- {{Message, Delivered, AckTag},
- State#q{backing_queue_state = BQS1}};
+ {{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
- discard(Delivery, State)}
+ discard(Delivery, BQ, BQS, MTC)}
end, qname(State), State#q.consumers) of
- {delivered, ActiveConsumersChanged, State1, Consumers} ->
+ {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} ->
{delivered, maybe_notify_decorators(
ActiveConsumersChanged,
- State1#q{consumers = Consumers})};
+ State#q{backing_queue_state = BQS1,
+ msg_id_to_channel = MTC1,
+ consumers = Consumers})};
{undelivered, ActiveConsumersChanged, Consumers} ->
{undelivered, maybe_notify_decorators(
ActiveConsumersChanged,
@@ -511,7 +516,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, State),
+ Props = message_properties(Message, Confirm, State1),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State2 = State1#q{backing_queue_state = BQS1},
case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
@@ -521,8 +526,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{delivered, State3} ->
State3;
%% The next one is an optimisation
- {undelivered, State3 = #q{ttl = 0, dlx = undefined}} ->
- discard(Delivery, State3);
+ {undelivered, State3 = #q{ttl = 0, dlx = undefined,
+ backing_queue_state = BQS2,
+ msg_id_to_channel = MTC}} ->
+ {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
+ State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
@@ -833,24 +841,36 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _Len, _State) ->
+prioritise_call(Msg, _From, _Len, State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- stat -> 7;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ stat -> 7;
+ {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
+ {basic_cancel, _, _, _} -> consumer_bias(State);
+ _ -> 0
end.
-prioritise_cast(Msg, _Len, _State) ->
+prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
+ {ack, _AckTags, _ChPid} -> consumer_bias(State);
+ {notify_sent, _ChPid, _Credit} -> consumer_bias(State);
+ {resume, _ChPid} -> consumer_bias(State);
_ -> 0
end.
+consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ case BQ:msg_rates(BQS) of
+ {0.0, _} -> 0;
+ {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> 1;
+ {_, _} -> 0
+ end.
+
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2b561900..3d88be7a 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -209,6 +209,10 @@
%% Called immediately before the queue hibernates.
-callback handle_pre_hibernate(state()) -> state().
+%% Used to help prioritisation in rabbit_amqqueue_process. The rate of
+%% inbound messages and outbound messages at the moment.
+-callback msg_rates(state()) -> {float(), float()}.
+
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
-callback status(state()) -> [{atom(), any()}].
@@ -236,7 +240,8 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
+ {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1},
+ {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4d866908..6b0f3700 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1439,8 +1439,9 @@ notify_limiter(Limiter, Acked) ->
end
end.
-deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
- mandatory = false},
+deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
+ confirm = false,
+ mandatory = false},
[]}, State) -> %% optimisation
?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
State;
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 2bd22579..49f7e388 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -51,8 +51,11 @@ behaviour_info(_Other) ->
%%----------------------------------------------------------------------------
-intercept_method(#'basic.publish'{} = M, _VHost) ->
- M;
+intercept_method(#'basic.publish'{} = M, _VHost) -> M;
+intercept_method(#'basic.ack'{} = M, _VHost) -> M;
+intercept_method(#'basic.nack'{} = M, _VHost) -> M;
+intercept_method(#'basic.reject'{} = M, _VHost) -> M;
+intercept_method(#'basic.credit'{} = M, _VHost) -> M;
intercept_method(M, VHost) ->
intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))).
@@ -87,5 +90,7 @@ select(Method) ->
validate_method(M, M2) ->
rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
+%% keep dialyzer happy
+-spec internal_error(string(), [any()]) -> no_return().
internal_error(Format, Args) ->
- rabbit_misc:protocol_error(internal_error, Format, Args). \ No newline at end of file
+ rabbit_misc:protocol_error(internal_error, Format, Args).
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index f3463286..746f2bdb 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -90,6 +90,7 @@
status,
environment,
report,
+ set_cluster_name,
eval,
close_connection,
@@ -527,6 +528,10 @@ action(report, Node, _Args, _Opts, Inform) ->
[print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
ok;
+action(set_cluster_name, Node, [Name], _Opts, Inform) ->
+ Inform("Setting cluster name to ~s", [Name]),
+ rpc_call(Node, rabbit_nodes, set_cluster_name, [list_to_binary(Name)]);
+
action(eval, Node, [Expr], _Opts, _Inform) ->
case erl_scan:string(Expr) of
{ok, Scanned, _} ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 8ba29deb..27b8d1e6 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -79,9 +79,9 @@ remove_bindings(transaction, _X, Bs) ->
[begin
Path = [{FinalNode, _} | _] =
follow_down_get_path(X, split_topic_key(K)),
- trie_remove_binding(X, FinalNode, D),
+ trie_remove_binding(X, FinalNode, D, Args),
remove_path_if_empty(X, Path)
- end || #binding{source = X, key = K, destination = D} <- Bs],
+ end || #binding{source = X, key = K, destination = D, args = Args} <- Bs],
ok;
remove_bindings(none, _X, _Bs) ->
ok.
@@ -91,9 +91,10 @@ assert_args_equivalence(X, Args) ->
%%----------------------------------------------------------------------------
-internal_add_binding(#binding{source = X, key = K, destination = D}) ->
+internal_add_binding(#binding{source = X, key = K, destination = D,
+ args = Args}) ->
FinalNode = follow_down_create(X, split_topic_key(K)),
- trie_add_binding(X, FinalNode, D),
+ trie_add_binding(X, FinalNode, D, Args),
ok.
trie_match(X, Words) ->
@@ -176,7 +177,8 @@ trie_bindings(X, Node) ->
MatchHead = #topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X,
node_id = Node,
- destination = '$1'}},
+ destination = '$1',
+ arguments = '_'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
trie_update_node_counts(X, Node, Field, Delta) ->
@@ -213,20 +215,21 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
node_id = ToNode},
write).
-trie_add_binding(X, Node, D) ->
+trie_add_binding(X, Node, D, Args) ->
trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
- trie_binding_op(X, Node, D, fun mnesia:write/3).
+ trie_binding_op(X, Node, D, Args, fun mnesia:write/3).
-trie_remove_binding(X, Node, D) ->
+trie_remove_binding(X, Node, D, Args) ->
trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
- trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
+ trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3).
-trie_binding_op(X, Node, D, Op) ->
+trie_binding_op(X, Node, D, Args, Op) ->
ok = Op(rabbit_topic_trie_binding,
#topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X,
node_id = Node,
- destination = D}},
+ destination = D,
+ arguments = Args}},
write).
trie_remove_all_nodes(X) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 9ce5afcb..b272c64f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2]).
+ msg_rates/1, status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -353,6 +353,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:msg_rates(BQS).
+
status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:status(BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index da185dce..1f31b5c8 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -115,7 +115,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
- BQS = bq_init(BQ, Q1, []),
+ BQS = bq_init(BQ, Q1, new),
State = #state { q = Q1,
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index a3fd068f..b10f79c5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -70,6 +70,7 @@
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
+-export([moving_average/4]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -248,6 +249,8 @@
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
+-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined')
+ -> float()).
-endif.
%%----------------------------------------------------------------------------
@@ -1060,6 +1063,12 @@ stop_timer(State, Idx) ->
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
+moving_average(_Time, _HalfLife, Next, undefined) ->
+ Next;
+moving_average(Time, HalfLife, Next, Current) ->
+ Weight = math:exp(Time * math:log(0.5) / HalfLife),
+ Next * (1 - Weight) + Current * Weight.
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index f27f77c6..59873ffc 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -327,6 +327,7 @@ status() ->
case is_running() of
true -> RunningNodes = cluster_nodes(running),
[{running_nodes, RunningNodes},
+ {cluster_name, rabbit_nodes:cluster_name()},
{partitions, mnesia_partitions(RunningNodes)}];
false -> []
end.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 5a1613a7..c5aa8473 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -17,7 +17,8 @@
-module(rabbit_nodes).
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
- is_running/2, is_process_running/2, fqdn_nodename/0]).
+ is_running/2, is_process_running/2,
+ cluster_name/0, set_cluster_name/1]).
-include_lib("kernel/include/inet.hrl").
@@ -37,7 +38,8 @@
-spec(cookie_hash/0 :: () -> string()).
-spec(is_running/2 :: (node(), atom()) -> boolean()).
-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
--spec(fqdn_nodename/0 :: () -> binary()).
+-spec(cluster_name/0 :: () -> binary()).
+-spec(set_cluster_name/1 :: (binary()) -> 'ok').
-endif.
@@ -111,8 +113,15 @@ is_process_running(Node, Process) ->
P when is_pid(P) -> true
end.
-fqdn_nodename() ->
+cluster_name() ->
+ rabbit_runtime_parameters:value_global(
+ cluster_name, cluster_name_default()).
+
+cluster_name_default() ->
{ID, _} = rabbit_nodes:parts(node()),
{ok, Host} = inet:gethostname(),
{ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
+
+set_cluster_name(Name) ->
+ rabbit_runtime_parameters:set_global(cluster_name, Name).
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index bea7e0d0..c9540da8 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -27,6 +27,9 @@
-define(UNSENT_MESSAGE_LIMIT, 200).
+%% Utilisation average calculations are all in μs.
+-define(USE_AVG_HALF_LIFE, 1000000.0).
+
-record(state, {consumers, use}).
-record(consumer, {tag, ack_required, args}).
@@ -430,11 +433,6 @@ update_use({inactive, Since, Active, Avg}, active) ->
use_avg(Active, Inactive, Avg) ->
Time = Inactive + Active,
- Ratio = Active / Time,
- Weight = erlang:min(1, Time / 1000000),
- case Avg of
- undefined -> Ratio;
- _ -> Ratio * Weight + Avg * (1 - Weight)
- end.
+ rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).
now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index b5a316f0..919b7376 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -358,9 +358,10 @@ start(DurableQueueNames) ->
%% Any queue directory we've not been asked to recover is considered garbage
QueuesDir = queues_dir(),
- [rabbit_file:recursive_delete([QueueDir]) ||
- QueueDir <- all_queue_directory_names(QueuesDir),
- not sets:is_element(filename:basename(QueueDir), DurableDirectories)],
+ rabbit_file:recursive_delete(
+ [filename:join(QueuesDir, DirName) ||
+ DirName <- all_queue_directory_names(QueuesDir),
+ not sets:is_element(DirName, DurableDirectories)]),
rabbit_recovery_terms:clear(),
@@ -373,9 +374,8 @@ stop() -> rabbit_recovery_terms:stop().
all_queue_directory_names(Dir) ->
case rabbit_file:list_dir(Dir) of
- {ok, Entries} -> [ Entry || Entry <- Entries,
- rabbit_file:is_dir(
- filename:join(Dir, Entry)) ];
+ {ok, Entries} -> [E || E <- Entries,
+ rabbit_file:is_dir(filename:join(Dir, E))];
{error, enoent} -> []
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 64debcab..47bc99d8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -156,19 +156,23 @@ server_properties(Protocol) ->
[case X of
{KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
longstr,
- list_to_binary(Value)};
+ maybe_list_to_binary(Value)};
{BinKey, Type, Value} -> {BinKey, Type, Value}
end || X <- RawConfigServerProps ++
- [{product, Product},
- {version, Version},
- {platform, "Erlang/OTP"},
- {copyright, ?COPYRIGHT_MESSAGE},
- {information, ?INFORMATION_MESSAGE}]]],
+ [{product, Product},
+ {version, Version},
+ {cluster_name, rabbit_nodes:cluster_name()},
+ {platform, "Erlang/OTP"},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]]],
%% Filter duplicated properties in favour of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps).
+maybe_list_to_binary(V) when is_binary(V) -> V;
+maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V).
+
server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
@@ -957,6 +961,9 @@ validate_negotiated_integer_value(Field, Min, ClientValue) ->
ok
end.
+%% keep dialyzer happy
+-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) ->
+ no_return().
fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
{S1, S2} = case MinOrMax of
min -> {lower, minimum};
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index bcde0078..18b9fbb8 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -22,6 +22,8 @@
list_component/1, list/2, list_formatted/1, lookup/3,
value/3, value/4, info_keys/0]).
+-export([set_global/2, value_global/1, value_global/2]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -34,6 +36,7 @@
-> ok_or_error_string()).
-spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term())
-> ok_or_error_string()).
+-spec(set_global/2 :: (atom(), term()) -> 'ok').
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
-> ok_or_error_string()).
-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary())
@@ -48,6 +51,8 @@
-> rabbit_types:infos() | 'not_found').
-spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()).
-spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()).
+-spec(value_global/1 :: (atom()) -> term() | 'not_found').
+-spec(value_global/2 :: (atom(), term()) -> term()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-endif.
@@ -74,6 +79,10 @@ set(_, <<"policy">>, _, _) ->
set(VHost, Component, Name, Term) ->
set_any(VHost, Component, Name, Term).
+set_global(Name, Term) ->
+ mnesia_update(Name, Term),
+ ok.
+
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
@@ -100,16 +109,22 @@ set_any0(VHost, Component, Name, Term) ->
E
end.
+mnesia_update(Key, Term) ->
+ rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)).
+
mnesia_update(VHost, Comp, Name, Term) ->
- F = fun () ->
- Res = case mnesia:read(?TABLE, {VHost, Comp, Name}, read) of
- [] -> new;
- [Params] -> {old, Params#runtime_parameters.value}
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_vhost:with(VHost, mnesia_update_fun({VHost, Comp, Name}, Term))).
+
+mnesia_update_fun(Key, Term) ->
+ fun () ->
+ Res = case mnesia:read(?TABLE, Key, read) of
+ [] -> new;
+ [Params] -> {old, Params#runtime_parameters.value}
end,
- ok = mnesia:write(?TABLE, c(VHost, Comp, Name, Term), write),
- Res
- end,
- rabbit_misc:execute_mnesia_transaction(rabbit_vhost:with(VHost, F)).
+ ok = mnesia:write(?TABLE, c(Key, Term), write),
+ Res
+ end.
clear(_, <<"policy">> , _) ->
{error_string, "policies may not be cleared using this method"};
@@ -159,43 +174,46 @@ list_formatted(VHost) ->
[pset(value, format(pget(value, P)), P) || P <- list(VHost)].
lookup(VHost, Component, Name) ->
- case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
+ case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
-value(VHost, Component, Name) ->
- case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
+value(VHost, Comp, Name) -> value0({VHost, Comp, Name}).
+value(VHost, Comp, Name, Def) -> value0({VHost, Comp, Name}, Def).
+
+value_global(Key) -> value0(Key).
+value_global(Key, Default) -> value0(Key, Default).
+
+value0(Key) ->
+ case lookup0(Key, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> Params#runtime_parameters.value
end.
-value(VHost, Component, Name, Default) ->
- Params = lookup0(VHost, Component, Name,
- fun () ->
- lookup_missing(VHost, Component, Name, Default)
- end),
+value0(Key, Default) ->
+ Params = lookup0(Key, fun () -> lookup_missing(Key, Default) end),
Params#runtime_parameters.value.
-lookup0(VHost, Component, Name, DefaultFun) ->
- case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of
+lookup0(Key, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, Key) of
[] -> DefaultFun();
[R] -> R
end.
-lookup_missing(VHost, Component, Name, Default) ->
+lookup_missing(Key, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {VHost, Component, Name}, read) of
- [] -> Record = c(VHost, Component, Name, Default),
+ case mnesia:read(?TABLE, Key, read) of
+ [] -> Record = c(Key, Default),
mnesia:write(?TABLE, Record, write),
Record;
[R] -> R
end
end).
-c(VHost, Component, Name, Default) ->
- #runtime_parameters{key = {VHost, Component, Name},
+c(Key, Default) ->
+ #runtime_parameters{key = Key,
value = Default}.
p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1acff127..767aab40 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -558,33 +558,38 @@ test_topic_matching() ->
key = list_to_binary(Key),
destination = #resource{virtual_host = <<"/">>,
kind = queue,
- name = list_to_binary(Q)}} ||
- {Key, Q} <- [{"a.b.c", "t1"},
- {"a.*.c", "t2"},
- {"a.#.b", "t3"},
- {"a.b.b.c", "t4"},
- {"#", "t5"},
- {"#.#", "t6"},
- {"#.b", "t7"},
- {"*.*", "t8"},
- {"a.*", "t9"},
- {"*.b.c", "t10"},
- {"a.#", "t11"},
- {"a.#.#", "t12"},
- {"b.b.c", "t13"},
- {"a.b.b", "t14"},
- {"a.b", "t15"},
- {"b.c", "t16"},
- {"", "t17"},
- {"*.*.*", "t18"},
- {"vodka.martini", "t19"},
- {"a.b.c", "t20"},
- {"*.#", "t21"},
- {"#.*.#", "t22"},
- {"*.#.#", "t23"},
- {"#.#.#", "t24"},
- {"*", "t25"},
- {"#.b.#", "t26"}]],
+ name = list_to_binary(Q)},
+ args = Args} ||
+ {Key, Q, Args} <- [{"a.b.c", "t1", []},
+ {"a.*.c", "t2", []},
+ {"a.#.b", "t3", []},
+ {"a.b.b.c", "t4", []},
+ {"#", "t5", []},
+ {"#.#", "t6", []},
+ {"#.b", "t7", []},
+ {"*.*", "t8", []},
+ {"a.*", "t9", []},
+ {"*.b.c", "t10", []},
+ {"a.#", "t11", []},
+ {"a.#.#", "t12", []},
+ {"b.b.c", "t13", []},
+ {"a.b.b", "t14", []},
+ {"a.b", "t15", []},
+ {"b.c", "t16", []},
+ {"", "t17", []},
+ {"*.*.*", "t18", []},
+ {"vodka.martini", "t19", []},
+ {"a.b.c", "t20", []},
+ {"*.#", "t21", []},
+ {"#.*.#", "t22", []},
+ {"*.#.#", "t23", []},
+ {"#.#.#", "t24", []},
+ {"*", "t25", []},
+ {"#.b.#", "t26", []},
+ {"args-test", "t27",
+ [{<<"foo">>, longstr, <<"bar">>}]},
+ {"args-test", "t27", %% Note aliasing
+ [{<<"foo">>, longstr, <<"baz">>}]}]],
lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
Bindings),
@@ -611,12 +616,13 @@ test_topic_matching() ->
"t22", "t23", "t24", "t26"]},
{"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
{"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
- "t25"]}]),
-
+ "t25"]},
+ {"args-test", ["t5", "t6", "t21", "t22", "t23", "t24",
+ "t25", "t27"]}]),
%% remove some bindings
RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
lists:nth(11, Bindings), lists:nth(19, Bindings),
- lists:nth(21, Bindings)],
+ lists:nth(21, Bindings), lists:nth(28, Bindings)],
exchange_op_callback(X, remove_bindings, [RemovedBindings]),
RemainingBindings = ordsets:to_list(
ordsets:subtract(ordsets:from_list(Bindings),
@@ -639,7 +645,8 @@ test_topic_matching() ->
{"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
"t24", "t26"]},
{"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
- {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]},
+ {"args-test", ["t6", "t22", "t23", "t24", "t25", "t27"]}]),
%% remove the entire exchange
exchange_op_callback(X, delete, [RemainingBindings]),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 90372461..4cb3cacc 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -47,6 +47,7 @@
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
+-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
%% -------------------------------------------------------------------
@@ -355,6 +356,32 @@ internal_system_x() ->
[name, type, durable, auto_delete, internal, arguments, scratches, policy,
decorators]).
+cluster_name() ->
+ {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0),
+ ok.
+
+cluster_name_tx() ->
+ %% mnesia:transform_table/4 does not let us delete records
+ T = rabbit_runtime_parameters,
+ mnesia:write_lock_table(T),
+ Ks = [K || {_VHost, <<"federation">>, <<"local-nodename">>} = K
+ <- mnesia:all_keys(T)],
+ case Ks of
+ [] -> ok;
+ [K|Tl] -> [{runtime_parameters, _K, Name}] = mnesia:read(T, K, write),
+ R = {runtime_parameters, cluster_name, Name},
+ mnesia:write(T, R, write),
+ case Tl of
+ [] -> ok;
+ _ -> {VHost, _, _} = K,
+ error_logger:warning_msg(
+ "Multiple local-nodenames found, picking '~s' "
+ "from '~s' for cluster name~n", [Name, VHost])
+ end
+ end,
+ [mnesia:delete(T, K, write) || K <- Ks],
+ ok.
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8711d139..9d242316 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -21,8 +21,8 @@
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0]).
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1,
+ status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -277,11 +277,10 @@
unconfirmed,
confirmed,
ack_out_counter,
- ack_in_counter,
- ack_rates
+ ack_in_counter
}).
--record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
+-record(rates, { in, out, ack_in, ack_out, timestamp }).
-record(msg_status,
{ seq_id,
@@ -322,11 +321,11 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
- ingress :: {timestamp(), non_neg_integer()},
- avg_egress :: float(),
- avg_ingress :: float(),
- timestamp :: timestamp() }).
+-type(rates() :: #rates { in :: float(),
+ out :: float(),
+ ack_in :: float(),
+ ack_out :: float(),
+ timestamp :: timestamp()}).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer(),
@@ -368,8 +367,7 @@
unconfirmed :: gb_set(),
confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
- ack_in_counter :: non_neg_integer(),
- ack_rates :: rates() }).
+ ack_in_counter :: non_neg_integer() }).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -384,6 +382,18 @@
count = 0,
end_seq_id = Z }).
+-define(MICROS_PER_SECOND, 1000000.0).
+
+%% We're sampling every 5s for RAM duration; a half life that is of
+%% the same order of magnitude is probably about right.
+-define(RATE_AVG_HALF_LIFE, 5.0).
+
+%% We will recalculate the #rates{} every time we get asked for our
+%% RAM duration, or every N messages published, whichever is
+%% sooner. We do this since the priority calculations in
+%% rabbit_amqqueue_process need fairly fresh rates.
+-define(MSGS_PER_RATE_CALC, 100).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -540,14 +550,18 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
end,
- PCount1 = PCount + one_if(IsPersistent1),
+ InCount1 = InCount + 1,
+ PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- a(reduce_memory_use(
- inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 }))).
+ State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }),
+ a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of
+ true -> update_rates(State3);
+ false -> State3
+ end)).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -629,6 +643,31 @@ drop(AckRequired, State) ->
ack([], State) ->
{[], State};
+%% optimisation: this head is essentially a partial evaluation of the
+%% general case below, for the single-ack case.
+ack([SeqId], State) ->
+ {#msg_status { msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
+ remove_pending_ack(SeqId, State),
+ IndexState1 = case IndexOnDisk of
+ true -> rabbit_queue_index:ack([SeqId], IndexState);
+ false -> IndexState
+ end,
+ case MsgOnDisk of
+ true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
+ false -> ok
+ end,
+ PCount1 = PCount - one_if(IsPersistent),
+ {[MsgId],
+ a(State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + 1 })};
ack(AckTags, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
@@ -696,10 +735,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
set_ram_duration_target(
DurationTarget, State = #vqstate {
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate },
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
target_ram_count = TargetRamCount }) ->
Rate =
AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
@@ -716,29 +755,43 @@ set_ram_duration_target(
false -> reduce_memory_use(State1)
end).
-ram_duration(State = #vqstate {
- rates = #rates { timestamp = Timestamp,
- egress = Egress,
- ingress = Ingress } = Rates,
- ack_rates = #rates { timestamp = AckTimestamp,
- egress = AckEgress,
- ingress = AckIngress } = ARates,
- in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
- ack_out_counter = AckOutCount,
- ram_msg_count = RamMsgCount,
- ram_msg_count_prev = RamMsgCountPrev,
- ram_pending_ack = RPA,
- ram_ack_count_prev = RamAckCountPrev }) ->
- Now = now(),
- {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
- {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
-
- {AvgAckEgressRate, AckEgress1} =
- update_rate(Now, AckTimestamp, AckOutCount, AckEgress),
- {AvgAckIngressRate, AckIngress1} =
- update_rate(Now, AckTimestamp, AckInCount, AckIngress),
+update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount,
+ rates = #rates{ in = InRate,
+ out = OutRate,
+ ack_in = AckInRate,
+ ack_out = AckOutRate,
+ timestamp = TS }}) ->
+ Now = erlang:now(),
+
+ Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
+ out = update_rate(Now, TS, OutCount, OutRate),
+ ack_in = update_rate(Now, TS, AckInCount, AckInRate),
+ ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
+ timestamp = Now },
+
+ State#vqstate{ in_counter = 0,
+ out_counter = 0,
+ ack_in_counter = 0,
+ ack_out_counter = 0,
+ rates = Rates }.
+
+update_rate(Now, TS, Count, Rate) ->
+ Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND,
+ rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate).
+
+ram_duration(State) ->
+ State1 = #vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
+ ram_msg_count = RamMsgCount,
+ ram_msg_count_prev = RamMsgCountPrev,
+ ram_pending_ack = RPA,
+ ram_ack_count_prev = RamAckCountPrev } =
+ update_rates(State),
RamAckCount = gb_trees:size(RPA),
@@ -752,25 +805,7 @@ ram_duration(State = #vqstate {
AvgAckEgressRate + AvgAckIngressRate))
end,
- {Duration, State #vqstate {
- rates = Rates #rates {
- egress = Egress1,
- ingress = Ingress1,
- avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate,
- timestamp = Now },
- ack_rates = ARates #rates {
- egress = AckEgress1,
- ingress = AckIngress1,
- avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate,
- timestamp = Now },
- in_counter = 0,
- out_counter = 0,
- ack_in_counter = 0,
- ack_out_counter = 0,
- ram_msg_count_prev = RamMsgCount,
- ram_ack_count_prev = RamAckCount }}.
+ {Duration, State1}.
needs_timeout(State = #vqstate { index_state = IndexState,
target_ram_count = TargetRamCount }) ->
@@ -796,6 +831,10 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate } }) ->
+ {AvgIngressRate, AvgEgressRate}.
+
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
@@ -805,10 +844,11 @@ status(#vqstate {
ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate } }) ->
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate }}) ->
+
[ {q1 , ?QUEUE:len(Q1)},
{q2 , ?QUEUE:len(Q2)},
{delta , Delta},
@@ -998,10 +1038,6 @@ expand_delta(SeqId, #delta { count = Count,
expand_delta(_SeqId, #delta { count = Count } = Delta) ->
d(Delta #delta { count = Count + 1 }).
-update_rate(Now, Then, Count, {OThen, OCount}) ->
- %% avg over the current period and the previous
- {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}.
-
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
@@ -1046,22 +1082,21 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ram_ack_count_prev = 0,
out_counter = 0,
in_counter = 0,
- rates = blank_rate(Now, DeltaCount1),
+ rates = blank_rates(Now),
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
confirmed = gb_sets:new(),
ack_out_counter = 0,
- ack_in_counter = 0,
- ack_rates = blank_rate(Now, 0) },
+ ack_in_counter = 0 },
a(maybe_deltas_to_betas(State)).
-blank_rate(Timestamp, IngressLength) ->
- #rates { egress = {Timestamp, 0},
- ingress = {Timestamp, IngressLength},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = Timestamp }.
+blank_rates(Now) ->
+ #rates { in = 0.0,
+ out = 0.0,
+ ack_in = 0.0,
+ ack_out = 0.0,
+ timestamp = Now}.
in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
@@ -1535,11 +1570,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
- rates = #rates { avg_ingress = AvgIngress,
- avg_egress = AvgEgress },
- ack_rates = #rates { avg_ingress = AvgAckIngress,
- avg_egress = AvgAckEgress }
- }) ->
+ rates = #rates { in = AvgIngress,
+ out = AvgEgress,
+ ack_in = AvgAckIngress,
+ ack_out = AvgAckEgress } }) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of