diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-09-29 17:09:09 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-09-29 17:09:09 +0100 |
commit | 6cca6fae7f00d19738f830684a6020f7975c284d (patch) | |
tree | 85a9b3a7f6fbdbb15799e0ae31692d7e67db6d82 | |
parent | 13c0d2951bfeb113a9329eafbb073294e2a8b698 (diff) | |
download | rabbitmq-server-6cca6fae7f00d19738f830684a6020f7975c284d.tar.gz |
Count the quota properly on beta -> delta. Also, fprof shows that the queue:len calls in permitted_beta_count can be very expensive. So wrap the whole of queue in lqueue and associated refactoring.
-rw-r--r-- | src/lqueue.erl | 45 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 103 |
2 files changed, 97 insertions, 51 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl new file mode 100644 index 00000000..8bbd52d9 --- /dev/null +++ b/src/lqueue.erl @@ -0,0 +1,45 @@ +-module(lqueue). + +-compile([export_all]). + +-define(QUEUE, queue). + +new() -> {0, ?QUEUE:new()}. + +is_empty({0, _Q}) -> + true; +is_empty(_) -> + false. + +in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}. +in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}. + +out({0, _Q} = Q) -> + {empty, Q}; +out({L, Q}) -> + {Result, Q1} = ?QUEUE:out(Q), + {Result, {L-1, Q1}}. + +out_r({0, _Q} = Q) -> + {empty, Q}; +out_r({L, Q}) -> + {Result, Q1} = ?QUEUE:out_r(Q), + {Result, {L-1, Q1}}. + +join({L1, Q1}, {L2, Q2}) -> + {L1 + L2, ?QUEUE:join(Q1, Q2)}. + +to_list({_L, Q}) -> + ?QUEUE:to_list(Q). + +from_list(L) -> + {length(L), ?QUEUE:from_list(L)}. + +queue_fold(Fun, Init, Q) -> + case out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. + +len({L, _Q}) -> + L. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e5e03133..15a4fa1d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -290,6 +290,7 @@ -define(IO_BATCH_SIZE, 64). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(QUEUE, lqueue). -include("rabbit.hrl"). @@ -479,19 +480,19 @@ purge(State = #vqstate { q4 = Q4, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( - fun rabbit_misc:queue_fold/3, Q4, + fun ?QUEUE:queue_fold/3, Q4, orddict:new(), IndexState, MSCState), {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2, msg_store_clients = MSCState1 }} = purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = queue:new(), + State #vqstate { q4 = ?QUEUE:new(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( - fun rabbit_misc:queue_fold/3, Q1, + fun ?QUEUE:queue_fold/3, Q1, LensByStore1, IndexState2, MSCState1), PCount1 = PCount - find_persistent_count(LensByStore2), - {Len, a(State1 #vqstate { q1 = queue:new(), + {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, ram_msg_count = 0, @@ -707,11 +708,11 @@ status(#vqstate { avg_ingress = AvgIngressRate }, ack_rates = #rates { avg_egress = AvgAckEgressRate, avg_ingress = AvgAckIngressRate } }) -> - [ {q1 , queue:len(Q1)}, - {q2 , queue:len(Q2)}, + [ {q1 , ?QUEUE:len(Q1)}, + {q2 , ?QUEUE:len(Q2)}, {delta , Delta}, - {q3 , queue:len(Q3)}, - {q4 , queue:len(Q4)}, + {q3 , ?QUEUE:len(Q3)}, + {q4 , ?QUEUE:len(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, {target_ram_count , TargetRamCount}, @@ -740,11 +741,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, persistent_count = PersistentCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }) -> - E1 = queue:is_empty(Q1), - E2 = queue:is_empty(Q2), + E1 = ?QUEUE:is_empty(Q1), + E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, - E3 = queue:is_empty(Q3), - E4 = queue:is_empty(Q4), + E3 = ?QUEUE:is_empty(Q3), + E4 = ?QUEUE:is_empty(Q4), LZ = Len == 0, true = E1 or not E3, @@ -848,7 +849,7 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> {queue:in_r( + false -> {?QUEUE:in_r( m(#msg_status { msg = undefined, msg_id = MsgId, seq_id = SeqId, @@ -861,7 +862,7 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> Delivers1, Acks1} end - end, {queue:new(), [], []}, List), + end, {?QUEUE:new(), [], []}, List), {Filtered, rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. @@ -892,7 +893,7 @@ combine_deltas(#delta { start_seq_id = StartLow, andalso ((StartLow + Count) =< EndHigh), #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. -beta_fold(Fun, Init, Q) -> rabbit_misc:queue_fold(Fun, Init, Q). +beta_fold(Fun, Init, Q) -> ?QUEUE:queue_fold(Fun, Init, Q). update_rate(Now, Then, Count, {OThen, OCount}) -> %% avg over the current period and the previous @@ -915,11 +916,11 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, end, Now = now(), State = #vqstate { - q1 = queue:new(), - q2 = queue:new(), + q1 = ?QUEUE:new(), + q2 = ?QUEUE:new(), delta = Delta, - q3 = queue:new(), - q4 = queue:new(), + q3 = ?QUEUE:new(), + q4 = ?QUEUE:new(), next_seq_id = NextSeqId, pending_ack = dict:new(), ram_ack_index = gb_trees:empty(), @@ -959,19 +960,19 @@ blank_rate(Timestamp, IngressLength) -> in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> - case queue:is_empty(Q4) of + case ?QUEUE:is_empty(Q4) of true -> State #vqstate { - q3 = queue:in_r(MsgStatus, Q3), + q3 = ?QUEUE:in_r(MsgStatus, Q3), ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) } + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) } end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. queue_out(State = #vqstate { q4 = Q4 }) -> - case queue:out(Q4) of + case ?QUEUE:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of {empty, _State1} = Result -> Result; @@ -1050,7 +1051,7 @@ purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> - case queue:is_empty(Q3) of + case ?QUEUE:is_empty(Q3) of true -> {LensByStore, State}; false -> {LensByStore1, IndexState1} = remove_queue_entries(fun beta_fold/3, Q3, @@ -1058,7 +1059,7 @@ purge_betas_and_deltas(LensByStore, purge_betas_and_deltas(LensByStore1, maybe_deltas_to_betas( State #vqstate { - q3 = queue:new(), + q3 = ?QUEUE:new(), index_state = IndexState1 })) end. @@ -1109,9 +1110,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case queue:is_empty(Q3) of - false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } + State2 = case ?QUEUE:is_empty(Q3) of + false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), @@ -1419,9 +1420,9 @@ permitted_beta_count(#vqstate { len = Len, q1 = Q1, q3 = Q3, q4 = Q4 }) -> - BetaDeltaLen = Len - queue:len(Q1) - queue:len(Q4), + BetaDeltaLen = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), Permitted = BetaDeltaLen - trunc(BetaDeltaLen * BetaDeltaLen / Len), - case queue:out(Q3) of + case ?QUEUE:out(Q3) of {empty, _Q3} -> Permitted; {{value, #msg_status { seq_id = MinSeqId }}, _Q3} -> lists:max([Permitted, @@ -1441,7 +1442,7 @@ fetch_from_q3(State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount}) -> - case queue:out(Q3) of + case ?QUEUE:out(Q3) of {empty, _Q3} -> {empty, State}; {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Q3a} -> @@ -1450,16 +1451,16 @@ fetch_from_q3(State = #vqstate { State1 = State #vqstate { q3 = Q3a, ram_index_count = RamIndexCount1 }, State2 = - case {queue:is_empty(Q3a), 0 == DeltaCount} of + case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% q3 is now empty, it wasn't before; delta is %% still empty. So q2 must be empty, and we %% know q4 is empty otherwise we wouldn't be %% loading from q3. As such, we can just set %% q4 to Q1. - true = queue:is_empty(Q2), %% ASSERTION - true = queue:is_empty(Q4), %% ASSERTION - State1 #vqstate { q1 = queue:new(), + true = ?QUEUE:is_empty(Q2), %% ASSERTION + true = ?QUEUE:is_empty(Q4), %% ASSERTION + State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; {true, false} -> maybe_deltas_to_betas(State1); @@ -1491,7 +1492,7 @@ maybe_deltas_to_betas(State = #vqstate { {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, - case queue:len(Q3a) of + case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being %% transient and below the threshold @@ -1499,14 +1500,14 @@ maybe_deltas_to_betas(State = #vqstate { State1 #vqstate { delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); Q3aLen -> - Q3b = queue:join(Q3, Q3a), + Q3b = ?QUEUE:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 - State1 #vqstate { q2 = queue:new(), + State1 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, - q3 = queue:join(Q3b, Q2) }; + q3 = ?QUEUE:join(Q3b, Q2) }; N when N > 0 -> Delta1 = #delta { start_seq_id = DeltaSeqId1, count = N, @@ -1523,21 +1524,21 @@ push_alphas_to_betas(Quota, State) -> maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( - fun queue:out/1, + fun ?QUEUE:out/1, fun (MsgStatus, Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> State1 #vqstate { q1 = Q1a, - q3 = queue:in(MsgStatus, Q3) }; + q3 = ?QUEUE:in(MsgStatus, Q3) }; (MsgStatus, Q1a, State1 = #vqstate { q2 = Q2 }) -> State1 #vqstate { q1 = Q1a, - q2 = queue:in(MsgStatus, Q2) } + q2 = ?QUEUE:in(MsgStatus, Q2) } end, Quota, Q1, State). maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas( - fun queue:out_r/1, + fun ?QUEUE:out_r/1, fun (MsgStatus, Q4a, State1 = #vqstate { q3 = Q3 }) -> - State1 #vqstate { q3 = queue:in_r(MsgStatus, Q3), + State1 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } end, Quota, Q4, State). @@ -1575,11 +1576,11 @@ push_betas_to_deltas(Quota, ram_index_count = RamIndexCount }) -> {Quota1, Delta2, Q2a, RamIndexCount2, IndexState2} = push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end, - fun queue:out/1, Quota, Q2, + fun ?QUEUE:out/1, Quota, Q2, RamIndexCount, IndexState), {_Quota2, Delta3, Q3a, RamIndexCount3, IndexState3} = push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1, - fun queue:out_r/1, Quota1, Q3, + fun ?QUEUE:out_r/1, Quota1, Q3, RamIndexCount2, IndexState2), Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)), State #vqstate { q2 = Q2a, @@ -1589,11 +1590,11 @@ push_betas_to_deltas(Quota, ram_index_count = RamIndexCount3 }. push_betas_to_deltas(LimitFun, Generator, Quota, Q, RamIndexCount, IndexState) -> - case queue:out(Q) of + case ?QUEUE:out(Q) of {empty, _Q} -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; {{value, #msg_status { seq_id = MinSeqId }}, _Qa} -> - {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = queue:out_r(Q), + {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q), Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; @@ -1607,8 +1608,8 @@ push_betas_to_deltas(LimitFun, Generator, Quota, Q, RamIndexCount, IndexState) - end end. -push_betas_to_deltas(_Generator, _Limit, Quota, Q, Quota, RamIndexCount, IndexState) -> - {Quota, Q, RamIndexCount, IndexState}; +push_betas_to_deltas(_Generator, _Limit, 0, Q, Count, RamIndexCount, IndexState) -> + {Count, Q, RamIndexCount, IndexState}; push_betas_to_deltas(Generator, Limit, Quota, Q, Count, RamIndexCount, IndexState) -> case Generator(Q) of {empty, _Q} -> |