summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-03 14:30:39 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-03 14:30:39 +0000
commita14752efdaac19e4f771673e06c26c7b2037db39 (patch)
tree4d4f40e5e48fe5b2766f91ddf161b82c28b463e2
parent3499cb05a36e41b456a807935811e097bd4a676d (diff)
downloadrabbitmq-server-a14752efdaac19e4f771673e06c26c7b2037db39.tar.gz
Roughly working ack shedding
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_variable_queue.erl37
2 files changed, 27 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 83b360b8..9699ac32 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -969,6 +969,7 @@ handle_cast(update_ram_duration, State = #q{backing_queue = BQ,
io:format("RamDuration~p~n", [RamDuration]),
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ io:format("Desired duration:~p~n", [DesiredDuration]),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
noreply(State#q{rate_timer_ref = just_measured,
backing_queue_state = BQS2});
@@ -1025,11 +1026,15 @@ handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
+handle_post_hibernate(_) ->
+ io:format("hello~n").
+
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
{hibernate, State};
handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
stats_timer = StatsTimer}) ->
+ io:format("Hibernating~p~n", [self()]),
BQS1 = BQ:handle_pre_hibernate(BQS),
%% no activity for a while == 0 egress and ingress rates
DesiredDuration =
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ad62248a..491d4e38 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -722,7 +722,7 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCountPrev,
ram_ack_index = RamAckIndex,
ram_ack_count_prev = RamAckCountPrev }) ->
- io:format("Setting ram duration~n"),
+ io:format("Ram duration~n"),
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
@@ -766,6 +766,7 @@ needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
{Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
fun (_Quota, State1) -> State1 end,
fun (State1) -> State1 end,
+ fun (_Quota, State1) -> State1 end,
State),
Res;
needs_idle_timeout(_State) ->
@@ -779,6 +780,7 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
pending_ack = PA,
+ ram_ack_index = RAI,
on_sync = #sync { funs = From },
target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
@@ -787,7 +789,10 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
persistent_count = PersistentCount,
rates = #rates {
avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate } }) ->
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates {
+ avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate } }) ->
[ {q1 , queue:len(Q1)},
{q2 , bpqueue:len(Q2)},
{delta , Delta},
@@ -795,6 +800,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
{q4 , queue:len(Q4)},
{len , Len},
{pending_acks , dict:size(PA)},
+ {ram_ack_count , gb_trees:size(RAI)},
{outstanding_txns , length(From)},
{target_ram_msg_count , TargetRamMsgCount},
{ram_msg_count , RamMsgCount},
@@ -802,7 +808,9 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_egress_rate , AvgEgressRate},
- {avg_ingress_rate , AvgIngressRate} ].
+ {avg_ingress_rate , AvgIngressRate},
+ {avg_ack_egress_rate , AvgAckEgressRate},
+ {avg_ack_ingress_rate , AvgAckIngressRate}].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -1342,9 +1350,8 @@ find_persistent_count(LensByStore) ->
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
- %%io:format("Reducing mem~p~p~n", [State #vqstate.target_ram_msg_count, State #vqstate.ram_msg_count]),
- {ReduceAck, State1} = {false, State}, %%reduce_ack_memory_use(State),
+reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) ->
+ {ReduceAck, State1} = reduce_ack_memory_use(AckFun, State),
{Reduce, State2} = case chunk_size(State1 #vqstate.ram_msg_count,
State1 #vqstate.target_ram_msg_count) of
0 -> {ReduceAck, State1};
@@ -1360,9 +1367,9 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
end
end.
-reduce_ack_memory_use(State = #vqstate { target_ram_msg_count = infinity }) ->
+reduce_ack_memory_use(_AckFun, State = #vqstate { target_ram_msg_count = infinity }) ->
{false, State};
-reduce_ack_memory_use(State = #vqstate {target_ram_msg_count = TargetRamMsgCount,
+reduce_ack_memory_use(AckFun, State = #vqstate {target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
ram_ack_index = RamAckIndex} ) ->
io:format("RAI:~p,TRMC:~p,RMC:~p~n", [gb_trees:size(RamAckIndex), TargetRamMsgCount, RamMsgCount]),
@@ -1372,7 +1379,7 @@ reduce_ack_memory_use(State = #vqstate {target_ram_msg_count = TargetRamMsgCount
end,
case chunk_size(gb_trees:size(RamAckIndex), PermittedAckCount) of
0 -> {false, State};
- C -> {true, limit_ram_acks(C, State)}
+ C -> {true, AckFun(C, State)}
end.
@@ -1381,20 +1388,19 @@ limit_ram_acks(0, State) ->
State;
limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
- io:format("Limiting acks~p~n", [Quota]),
case gb_trees:is_empty(RAI) of
true ->
State;
false ->
{SeqId, Guid, RAI1} = gb_trees:take_largest(RAI),
- io:format("Largest~p~n", [SeqId]),
- MsgStatus = dict:fetch(SeqId, PA),
- State1 = maybe_write_to_disk(true, false, MsgStatus, State),
- io:format("Wrote~n"),
+ MsgStatus = #msg_status {
+ guid = Guid, %% ASSERTION
+ msg_props = MsgProps } = dict:fetch(SeqId, PA),
+ {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
limit_ram_acks(Quota - 1,
State1 #vqstate {
pending_ack =
- dict:update(SeqId, {false, Guid}, PA),
+ dict:store(SeqId, {false, Guid, MsgProps}, PA),
ram_ack_index = RAI1 })
end.
@@ -1403,6 +1409,7 @@ reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun limit_ram_index/2,
fun push_betas_to_deltas/1,
+ fun limit_ram_acks/2,
State),
State1.