diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-07 12:02:55 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-07 12:02:55 +0000 |
commit | ae4f73f6cfff2bda3eaf5afb75c32f9b281f297c (patch) | |
tree | 43e9e690e101e47b0b9a6e6e782d2c490d2a22a3 | |
parent | ecc8ccfe87a92265a3714c3632ead6fef7f34139 (diff) | |
download | rabbitmq-server-ae4f73f6cfff2bda3eaf5afb75c32f9b281f297c.tar.gz |
trigger rate calculation in all the right placesbug26004
-rw-r--r-- | src/rabbit_variable_queue.erl | 46 |
1 files changed, 26 insertions, 20 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 321af4ac..73e9f6b5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -558,10 +558,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, in_counter = InCount1, persistent_count = PCount1, unconfirmed = UC1 }), - a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of - true -> update_rates(State3); - false -> State3 - end)). + a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -579,12 +576,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - {SeqId, a(reduce_memory_use( - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }))}. + State3 = State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 }, + {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. discard(_MsgId, _ChPid, State) -> State. @@ -704,11 +701,12 @@ requeue(AckTags, #vqstate { delta = Delta, State2), MsgCount = length(MsgIds2), {MsgIds2, a(reduce_memory_use( - State3 #vqstate { delta = Delta1, - q3 = Q3a, - q4 = Q4a, - in_counter = InCounter + MsgCount, - len = Len + MsgCount }))}. + maybe_update_rates( + State3 #vqstate { delta = Delta1, + q3 = Q3a, + q4 = Q4a, + in_counter = InCounter + MsgCount, + len = Len + MsgCount })))}. ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = @@ -755,6 +753,13 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end). +maybe_update_rates(State = #vqstate{ in_counter = InCount, + out_counter = OutCount }) + when InCount + OutCount > ?MSGS_PER_RATE_CALC -> + update_rates(State); +maybe_update_rates(State) -> + State. + update_rates(State = #vqstate{ in_counter = InCount, out_counter = OutCount, ack_in_counter = AckInCount, @@ -1179,11 +1184,12 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len - 1, - persistent_count = PCount1}}. + {AckTag, maybe_update_rates( + State1 #vqstate {ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len - 1, + persistent_count = PCount1})}. purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, |