summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-02-07 12:02:55 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-02-07 12:02:55 +0000
commitae4f73f6cfff2bda3eaf5afb75c32f9b281f297c (patch)
tree43e9e690e101e47b0b9a6e6e782d2c490d2a22a3
parentecc8ccfe87a92265a3714c3632ead6fef7f34139 (diff)
downloadrabbitmq-server-bug26004.tar.gz
trigger rate calculation in all the right placesbug26004
-rw-r--r--src/rabbit_variable_queue.erl46
1 files changed, 26 insertions, 20 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 321af4ac..73e9f6b5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -558,10 +558,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
in_counter = InCount1,
persistent_count = PCount1,
unconfirmed = UC1 }),
- a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of
- true -> update_rates(State3);
- false -> State3
- end)).
+ a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -579,12 +576,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- {SeqId, a(reduce_memory_use(
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 }))}.
+ State3 = State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 },
+ {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
discard(_MsgId, _ChPid, State) -> State.
@@ -704,11 +701,12 @@ requeue(AckTags, #vqstate { delta = Delta,
State2),
MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use(
- State3 #vqstate { delta = Delta1,
- q3 = Q3a,
- q4 = Q4a,
- in_counter = InCounter + MsgCount,
- len = Len + MsgCount }))}.
+ maybe_update_rates(
+ State3 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ q4 = Q4a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount })))}.
ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
@@ -755,6 +753,13 @@ set_ram_duration_target(
false -> reduce_memory_use(State1)
end).
+maybe_update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount })
+ when InCount + OutCount > ?MSGS_PER_RATE_CALC ->
+ update_rates(State);
+maybe_update_rates(State) ->
+ State.
+
update_rates(State = #vqstate{ in_counter = InCount,
out_counter = OutCount,
ack_in_counter = AckInCount,
@@ -1179,11 +1184,12 @@ remove(AckRequired, MsgStatus = #msg_status {
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len - 1,
- persistent_count = PCount1}}.
+ {AckTag, maybe_update_rates(
+ State1 #vqstate {ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1})}.
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,