summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-09-29 17:09:09 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-09-29 17:09:09 +0100
commit6cca6fae7f00d19738f830684a6020f7975c284d (patch)
tree85a9b3a7f6fbdbb15799e0ae31692d7e67db6d82
parent13c0d2951bfeb113a9329eafbb073294e2a8b698 (diff)
downloadrabbitmq-server-6cca6fae7f00d19738f830684a6020f7975c284d.tar.gz
Count the quota properly on beta -> delta. Also, fprof shows that the queue:len calls in permitted_beta_count can be very expensive. So wrap the whole of queue in lqueue and associated refactoring.
-rw-r--r--src/lqueue.erl45
-rw-r--r--src/rabbit_variable_queue.erl103
2 files changed, 97 insertions, 51 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl
new file mode 100644
index 00000000..8bbd52d9
--- /dev/null
+++ b/src/lqueue.erl
@@ -0,0 +1,45 @@
+-module(lqueue).
+
+-compile([export_all]).
+
+-define(QUEUE, queue).
+
+new() -> {0, ?QUEUE:new()}.
+
+is_empty({0, _Q}) ->
+ true;
+is_empty(_) ->
+ false.
+
+in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}.
+in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}.
+
+out({0, _Q} = Q) ->
+ {empty, Q};
+out({L, Q}) ->
+ {Result, Q1} = ?QUEUE:out(Q),
+ {Result, {L-1, Q1}}.
+
+out_r({0, _Q} = Q) ->
+ {empty, Q};
+out_r({L, Q}) ->
+ {Result, Q1} = ?QUEUE:out_r(Q),
+ {Result, {L-1, Q1}}.
+
+join({L1, Q1}, {L2, Q2}) ->
+ {L1 + L2, ?QUEUE:join(Q1, Q2)}.
+
+to_list({_L, Q}) ->
+ ?QUEUE:to_list(Q).
+
+from_list(L) ->
+ {length(L), ?QUEUE:from_list(L)}.
+
+queue_fold(Fun, Init, Q) ->
+ case out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
+
+len({L, _Q}) ->
+ L.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e5e03133..15a4fa1d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -290,6 +290,7 @@
-define(IO_BATCH_SIZE, 64).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(QUEUE, lqueue).
-include("rabbit.hrl").
@@ -479,19 +480,19 @@ purge(State = #vqstate { q4 = Q4,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
{LensByStore, IndexState1} = remove_queue_entries(
- fun rabbit_misc:queue_fold/3, Q4,
+ fun ?QUEUE:queue_fold/3, Q4,
orddict:new(), IndexState, MSCState),
{LensByStore1, State1 = #vqstate { q1 = Q1,
index_state = IndexState2,
msg_store_clients = MSCState1 }} =
purge_betas_and_deltas(LensByStore,
- State #vqstate { q4 = queue:new(),
+ State #vqstate { q4 = ?QUEUE:new(),
index_state = IndexState1 }),
{LensByStore2, IndexState3} = remove_queue_entries(
- fun rabbit_misc:queue_fold/3, Q1,
+ fun ?QUEUE:queue_fold/3, Q1,
LensByStore1, IndexState2, MSCState1),
PCount1 = PCount - find_persistent_count(LensByStore2),
- {Len, a(State1 #vqstate { q1 = queue:new(),
+ {Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
@@ -707,11 +708,11 @@ status(#vqstate {
avg_ingress = AvgIngressRate },
ack_rates = #rates { avg_egress = AvgAckEgressRate,
avg_ingress = AvgAckIngressRate } }) ->
- [ {q1 , queue:len(Q1)},
- {q2 , queue:len(Q2)},
+ [ {q1 , ?QUEUE:len(Q1)},
+ {q2 , ?QUEUE:len(Q2)},
{delta , Delta},
- {q3 , queue:len(Q3)},
- {q4 , queue:len(Q4)},
+ {q3 , ?QUEUE:len(Q3)},
+ {q4 , ?QUEUE:len(Q4)},
{len , Len},
{pending_acks , dict:size(PA)},
{target_ram_count , TargetRamCount},
@@ -740,11 +741,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
persistent_count = PersistentCount,
ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }) ->
- E1 = queue:is_empty(Q1),
- E2 = queue:is_empty(Q2),
+ E1 = ?QUEUE:is_empty(Q1),
+ E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
- E3 = queue:is_empty(Q3),
- E4 = queue:is_empty(Q4),
+ E3 = ?QUEUE:is_empty(Q3),
+ E4 = ?QUEUE:is_empty(Q4),
LZ = Len == 0,
true = E1 or not E3,
@@ -848,7 +849,7 @@ betas_from_index_entries(List, TransientThreshold, IndexState) ->
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> {queue:in_r(
+ false -> {?QUEUE:in_r(
m(#msg_status { msg = undefined,
msg_id = MsgId,
seq_id = SeqId,
@@ -861,7 +862,7 @@ betas_from_index_entries(List, TransientThreshold, IndexState) ->
Delivers1,
Acks1}
end
- end, {queue:new(), [], []}, List),
+ end, {?QUEUE:new(), [], []}, List),
{Filtered,
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
@@ -892,7 +893,7 @@ combine_deltas(#delta { start_seq_id = StartLow,
andalso ((StartLow + Count) =< EndHigh),
#delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
-beta_fold(Fun, Init, Q) -> rabbit_misc:queue_fold(Fun, Init, Q).
+beta_fold(Fun, Init, Q) -> ?QUEUE:queue_fold(Fun, Init, Q).
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
@@ -915,11 +916,11 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
end,
Now = now(),
State = #vqstate {
- q1 = queue:new(),
- q2 = queue:new(),
+ q1 = ?QUEUE:new(),
+ q2 = ?QUEUE:new(),
delta = Delta,
- q3 = queue:new(),
- q4 = queue:new(),
+ q3 = ?QUEUE:new(),
+ q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
pending_ack = dict:new(),
ram_ack_index = gb_trees:empty(),
@@ -959,19 +960,19 @@ blank_rate(Timestamp, IngressLength) ->
in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
- case queue:is_empty(Q4) of
+ case ?QUEUE:is_empty(Q4) of
true -> State #vqstate {
- q3 = queue:in_r(MsgStatus, Q3),
+ q3 = ?QUEUE:in_r(MsgStatus, Q3),
ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) }
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
- State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
+ State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
queue_out(State = #vqstate { q4 = Q4 }) ->
- case queue:out(Q4) of
+ case ?QUEUE:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3(State) of
{empty, _State1} = Result -> Result;
@@ -1050,7 +1051,7 @@ purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- case queue:is_empty(Q3) of
+ case ?QUEUE:is_empty(Q3) of
true -> {LensByStore, State};
false -> {LensByStore1, IndexState1} =
remove_queue_entries(fun beta_fold/3, Q3,
@@ -1058,7 +1059,7 @@ purge_betas_and_deltas(LensByStore,
purge_betas_and_deltas(LensByStore1,
maybe_deltas_to_betas(
State #vqstate {
- q3 = queue:new(),
+ q3 = ?QUEUE:new(),
index_state = IndexState1 }))
end.
@@ -1109,9 +1110,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case queue:is_empty(Q3) of
- false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
+ State2 = case ?QUEUE:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
@@ -1419,9 +1420,9 @@ permitted_beta_count(#vqstate { len = Len,
q1 = Q1,
q3 = Q3,
q4 = Q4 }) ->
- BetaDeltaLen = Len - queue:len(Q1) - queue:len(Q4),
+ BetaDeltaLen = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
Permitted = BetaDeltaLen - trunc(BetaDeltaLen * BetaDeltaLen / Len),
- case queue:out(Q3) of
+ case ?QUEUE:out(Q3) of
{empty, _Q3} -> Permitted;
{{value, #msg_status { seq_id = MinSeqId }}, _Q3} ->
lists:max([Permitted,
@@ -1441,7 +1442,7 @@ fetch_from_q3(State = #vqstate {
q3 = Q3,
q4 = Q4,
ram_index_count = RamIndexCount}) ->
- case queue:out(Q3) of
+ case ?QUEUE:out(Q3) of
{empty, _Q3} ->
{empty, State};
{{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Q3a} ->
@@ -1450,16 +1451,16 @@ fetch_from_q3(State = #vqstate {
State1 = State #vqstate { q3 = Q3a,
ram_index_count = RamIndexCount1 },
State2 =
- case {queue:is_empty(Q3a), 0 == DeltaCount} of
+ case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->
%% q3 is now empty, it wasn't before; delta is
%% still empty. So q2 must be empty, and we
%% know q4 is empty otherwise we wouldn't be
%% loading from q3. As such, we can just set
%% q4 to Q1.
- true = queue:is_empty(Q2), %% ASSERTION
- true = queue:is_empty(Q4), %% ASSERTION
- State1 #vqstate { q1 = queue:new(),
+ true = ?QUEUE:is_empty(Q2), %% ASSERTION
+ true = ?QUEUE:is_empty(Q4), %% ASSERTION
+ State1 #vqstate { q1 = ?QUEUE:new(),
q4 = Q1 };
{true, false} ->
maybe_deltas_to_betas(State1);
@@ -1491,7 +1492,7 @@ maybe_deltas_to_betas(State = #vqstate {
{Q3a, IndexState2} =
betas_from_index_entries(List, TransientThreshold, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
- case queue:len(Q3a) of
+ case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
%% transient and below the threshold
@@ -1499,14 +1500,14 @@ maybe_deltas_to_betas(State = #vqstate {
State1 #vqstate {
delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
Q3aLen ->
- Q3b = queue:join(Q3, Q3a),
+ Q3b = ?QUEUE:join(Q3, Q3a),
case DeltaCount - Q3aLen of
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
- State1 #vqstate { q2 = queue:new(),
+ State1 #vqstate { q2 = ?QUEUE:new(),
delta = ?BLANK_DELTA,
- q3 = queue:join(Q3b, Q2) };
+ q3 = ?QUEUE:join(Q3b, Q2) };
N when N > 0 ->
Delta1 = #delta { start_seq_id = DeltaSeqId1,
count = N,
@@ -1523,21 +1524,21 @@ push_alphas_to_betas(Quota, State) ->
maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(
- fun queue:out/1,
+ fun ?QUEUE:out/1,
fun (MsgStatus, Q1a,
State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
State1 #vqstate { q1 = Q1a,
- q3 = queue:in(MsgStatus, Q3) };
+ q3 = ?QUEUE:in(MsgStatus, Q3) };
(MsgStatus, Q1a, State1 = #vqstate { q2 = Q2 }) ->
State1 #vqstate { q1 = Q1a,
- q2 = queue:in(MsgStatus, Q2) }
+ q2 = ?QUEUE:in(MsgStatus, Q2) }
end, Quota, Q1, State).
maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(
- fun queue:out_r/1,
+ fun ?QUEUE:out_r/1,
fun (MsgStatus, Q4a, State1 = #vqstate { q3 = Q3 }) ->
- State1 #vqstate { q3 = queue:in_r(MsgStatus, Q3),
+ State1 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3),
q4 = Q4a }
end, Quota, Q4, State).
@@ -1575,11 +1576,11 @@ push_betas_to_deltas(Quota,
ram_index_count = RamIndexCount }) ->
{Quota1, Delta2, Q2a, RamIndexCount2, IndexState2} =
push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end,
- fun queue:out/1, Quota, Q2,
+ fun ?QUEUE:out/1, Quota, Q2,
RamIndexCount, IndexState),
{_Quota2, Delta3, Q3a, RamIndexCount3, IndexState3} =
push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1,
- fun queue:out_r/1, Quota1, Q3,
+ fun ?QUEUE:out_r/1, Quota1, Q3,
RamIndexCount2, IndexState2),
Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)),
State #vqstate { q2 = Q2a,
@@ -1589,11 +1590,11 @@ push_betas_to_deltas(Quota,
ram_index_count = RamIndexCount3 }.
push_betas_to_deltas(LimitFun, Generator, Quota, Q, RamIndexCount, IndexState) ->
- case queue:out(Q) of
+ case ?QUEUE:out(Q) of
{empty, _Q} ->
{Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
{{value, #msg_status { seq_id = MinSeqId }}, _Qa} ->
- {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = queue:out_r(Q),
+ {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q),
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
@@ -1607,8 +1608,8 @@ push_betas_to_deltas(LimitFun, Generator, Quota, Q, RamIndexCount, IndexState) -
end
end.
-push_betas_to_deltas(_Generator, _Limit, Quota, Q, Quota, RamIndexCount, IndexState) ->
- {Quota, Q, RamIndexCount, IndexState};
+push_betas_to_deltas(_Generator, _Limit, 0, Q, Count, RamIndexCount, IndexState) ->
+ {Count, Q, RamIndexCount, IndexState};
push_betas_to_deltas(Generator, Limit, Quota, Q, Count, RamIndexCount, IndexState) ->
case Generator(Q) of
{empty, _Q} ->