summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-02 16:34:35 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-02 16:34:35 +0000
commitc8940f9ce61112880926d508a09e7595d68f8c37 (patch)
tree75ec59756f7db51a825999dde3c4751e129b62ef
parentf525c97b059b62cf68361cd70e346339aa088582 (diff)
parent950866c469f6f46edba59e3b30cb47c1ba711e1b (diff)
downloadrabbitmq-server-c8940f9ce61112880926d508a09e7595d68f8c37.tar.gz
Starting tracking ack in/out rates
-rw-r--r--src/rabbit_variable_queue.erl144
1 files changed, 101 insertions, 43 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f88e49c2..97833991 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -220,6 +220,8 @@
q4,
next_seq_id,
pending_ack,
+ pending_ack_index,
+ ram_ack_index,
index_state,
msg_store_clients,
on_sync,
@@ -230,12 +232,16 @@
persistent_count,
target_ram_msg_count,
+ target_ram_ack_count,
ram_msg_count,
ram_msg_count_prev,
ram_index_count,
out_counter,
in_counter,
- rates
+ ack_out_counter,
+ ack_in_counter,
+ rates,
+ ack_rates
}).
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
@@ -306,6 +312,7 @@
q4 :: queue(),
next_seq_id :: seq_id(),
pending_ack :: dict:dictionary(),
+ ram_ack_index :: gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -317,12 +324,16 @@
transient_threshold :: non_neg_integer(),
target_ram_msg_count :: non_neg_integer() | 'infinity',
+ target_ram_ack_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
- rates :: rates() }).
+ ack_out_counter :: non_neg_integer(),
+ ack_in_counter :: non_neg_integer(),
+ rates :: rates(),
+ ack_rates :: rates() }).
-include("rabbit_backing_queue_spec.hrl").
@@ -479,19 +490,17 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
out_counter = OutCount,
in_counter = InCount,
persistent_count = PCount,
- pending_ack = PA,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- PA1 = record_pending_ack(m(MsgStatus1), PA),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
+ {SeqId, a(State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
+ persistent_count = PCount1 })}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -561,8 +570,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount,
- pending_ack = PA }) ->
+ persistent_count = PCount }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -582,12 +590,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
%% 3. If an ack is required, add something sensible to PA
- {AckTag, PA1} = case AckRequired of
- true -> PA2 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, PA),
- {SeqId, PA2};
- false -> {blank_ack, PA}
+ {AckTag, State1} = case AckRequired of
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {blank_ack, State}
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
@@ -595,12 +603,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
{{Msg, IsDelivered, AckTag, Len1},
- a(State #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
+ a(State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1 })}.
ack(AckTags, State) ->
a(ack(fun msg_store_remove/3,
@@ -692,7 +699,7 @@ set_ram_duration_target(DurationTarget,
(TargetRamMsgCount =/= infinity andalso
TargetRamMsgCount1 >= TargetRamMsgCount) of
true -> State1;
- false -> reduce_memory_use(State1)
+ false -> io:format("Reducing~n"), reduce_memory_use(State1)
end).
ram_duration(State = #vqstate {
@@ -962,6 +969,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
q4 = queue:new(),
next_seq_id = NextSeqId,
pending_ack = dict:new(),
+ ram_ack_index = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
on_sync = ?BLANK_SYNC,
@@ -977,10 +985,17 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ram_index_count = 0,
out_counter = 0,
in_counter = 0,
+ ack_out_counter = 0,
+ ack_in_counter = 0,
rates = #rates { egress = {Now, 0},
ingress = {Now, DeltaCount1},
avg_egress = 0.0,
avg_ingress = 0.0,
+ timestamp = Now },
+ ack_rates = #rates { egress = {Now, 0},
+ ingress = {Now, 0},
+ avg_egress = 0.0,
+ avg_ingress = 0.0,
timestamp = Now } },
a(maybe_deltas_to_betas(State)).
@@ -1191,20 +1206,31 @@ record_pending_ack(#msg_status { seq_id = SeqId,
guid = Guid,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk,
- msg_props = MsgProps } = MsgStatus, PA) ->
- AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid, MsgProps};
- false -> MsgStatus
- end,
- dict:store(SeqId, AckEntry, PA).
+ msg_props = MsgProps } = MsgStatus,
+ State = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI,
+ ack_in_counter = AckInCount}) ->
+ {AckEntry, RAI1} =
+ case MsgOnDisk of
+ true ->
+ {{IsPersistent, Guid, MsgProps}, RAI};
+ false ->
+ {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ end,
+ PA1 = dict:store(SeqId, AckEntry, PA),
+ State #vqstate { pending_ack = PA1,
+ ram_ack_index = RAI1,
+ ack_in_counter = AckInCount + 1}.
+%% TODO: On remove, need to prevent any seqids that
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
{SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
{[], orddict:new()}, PA),
- State1 = State #vqstate { pending_ack = dict:new() },
+ State1 = State #vqstate { pending_ack = dict:new(),
+ ram_ack_index = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(false, GuidsByStore) of
error -> State1;
@@ -1226,13 +1252,17 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount }} =
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
+ fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI}}) ->
AckEntry = dict:fetch(SeqId, PA),
{accumulate_ack(SeqId, AckEntry, Acc),
Fun(AckEntry, State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA) })}
+ pending_ack = dict:erase(SeqId, PA),
+ ram_ack_index =
+ gb_trees:delete_any(SeqId, RAI)})}
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
@@ -1241,7 +1271,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }.
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount - length(AckTags) }.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1281,21 +1312,48 @@ find_persistent_count(LensByStore) ->
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
- {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count,
+ Size = gb_trees:size(State #vqstate.ram_ack_index),
+ State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of
+ 0 -> State;
+ S -> io:format("Limiting~n"), limit_ram_acks(S, State)
+ end,
+ {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count,
State #vqstate.target_ram_msg_count) of
- 0 -> {false, State};
- S1 -> {true, AlphaBetaFun(S1, State)}
+ 0 -> {false, State1};
+ S1 -> {true, AlphaBetaFun(S1, State1)}
end,
- case State1 #vqstate.target_ram_msg_count of
- infinity -> {Reduce, State1};
- 0 -> {Reduce, BetaDeltaFun(State1)};
- _ -> case chunk_size(State1 #vqstate.ram_index_count,
- permitted_ram_index_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
- _ -> {Reduce, State1}
+ case State2 #vqstate.target_ram_msg_count of
+ infinity -> {Reduce, State2};
+ 0 -> {Reduce, BetaDeltaFun(State2)};
+ _ -> case chunk_size(State2 #vqstate.ram_index_count,
+ permitted_ram_index_count(State2)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State2)};
+ _ -> {Reduce, State2}
end
end.
+limit_ram_acks(0, State) ->
+ State;
+limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI }) ->
+ io:format("Limiting acks~p~n", [Quota]),
+ case gb_trees:is_empty(RAI) of
+ true ->
+ State;
+ false ->
+ {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI),
+ io:format("Largest~p~n", [SeqId]),
+ MsgStatus = dict:fetch(SeqId, PA),
+ State1 = maybe_write_to_disk(true, false, MsgStatus, State),
+ io:format("Wrote~n"),
+ limit_ram_acks(Quota - 1,
+ State1 #vqstate {
+ pending_ack =
+ dict:update(SeqId, {false, Guid}, PA),
+ ram_ack_index = RAI1 })
+ end.
+
+
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun limit_ram_index/2,