diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-09-29 17:48:29 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-09-29 17:48:29 +0100 |
commit | 1ebfa127f0ebb80cfe1277decb0f00939e593ca8 (patch) | |
tree | 6bac96a83e81d26d39ba5544000f320ebac4c8cf | |
parent | 854407a2dbc5c3d129cca4983bc10b06e37c2ffe (diff) | |
download | rabbitmq-server-1ebfa127f0ebb80cfe1277decb0f00939e593ca8.tar.gz |
Refactor, rename, comment
-rw-r--r-- | src/rabbit_variable_queue.erl | 52 |
1 files changed, 21 insertions, 31 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0ef0043e..66567c0c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -573,7 +573,7 @@ ack(AckTags, State) -> requeue(AckTags, MsgPropsFun, State) -> {MsgIds, State1} = requeue_merge(lists:sort(AckTags), MsgPropsFun, State), - {MsgIds, reduce_memory_use(State1)}. + {MsgIds, a(reduce_memory_use(State1))}. len(#vqstate { len = Len }) -> Len. @@ -823,7 +823,7 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, AckedFun, IndexState) -> +betas_from_index_entries(List, TransientThreshold, PendingAcks, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}, @@ -832,7 +832,7 @@ betas_from_index_entries(List, TransientThreshold, AckedFun, IndexState) -> true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> case AckedFun(SeqId) of + false -> case dict:is_key(SeqId, PendingAcks) of false -> {[m(#msg_status { msg = undefined, msg_id = MsgId, @@ -1319,10 +1319,10 @@ requeue_merge(SeqIdsSorted, MsgPropsFun, q3 = Q3, q4 = Q4, in_counter = InCounter, - len = Len } = State) -> - {SeqIds1, MsgIds, Q4a, State1} = queue_merge(SeqIdsSorted, q3_limit(Q3), - Q4, queue:new(), [], - q4_funs(MsgPropsFun), State), + len = Len } = State) -> + {SeqIds1, MsgIds, Q4a, State1} = queue_merge(SeqIdsSorted, beta_limit(Q3), + Q4, queue:new(), [], + q4_funs(MsgPropsFun), State), {SeqIds2, MsgIds1, Q3a, State2} = queue_merge(SeqIds1, delta_limit(Delta), Q3, bpqueue:new(), MsgIds, q3_funs(MsgPropsFun), State1), @@ -1335,26 +1335,20 @@ requeue_merge(SeqIdsSorted, MsgPropsFun, in_counter = InCounter + MsgCount, len = Len + MsgCount }}. -queue_merge([], _Limit, Q, Front, MsgIds, #merge_funs { join = QJoin }, - State) -> - {[], MsgIds, QJoin(Front, Q), State}; +%% Rebuild queue, inserting sequence ids to maintain ordering queue_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, State) when Limit == undefined orelse SeqId < Limit -> case QOut(Q) of - {{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} -> - case SeqId1 > SeqId of - true -> {#msg_status { msg_id = MsgId } = MsgStatus1, State1} = - QPublish(SeqId, State), - queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front), - [MsgId | MsgIds], Funs, State1); - false -> queue_merge(SeqIds, Limit, Q1, - QIn(MsgStatusHead, Front), MsgIds, - Funs, State) - end; - {empty, _Q1} -> + {{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} + when SeqId1 < SeqId -> + %% enqueue from the remaining queue + queue_merge(SeqIds, Limit, Q1, QIn(MsgStatusHead, Front), MsgIds, + Funs, State); + {_, _Q1} -> + %% enqueue from the remaining list of sequence ids {#msg_status { msg_id = MsgId } = MsgStatus1, State1} = QPublish(SeqId, State), queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front), @@ -1433,7 +1427,7 @@ delta_merge(SeqIds, MsgIds, #delta { start_seq_id = StartSeqId, {[MsgId | MsgIds0], Delta1, State2} end, {MsgIds, Delta, State}, SeqIds). -% Mostly opposite of record_pending_ack/2 +%% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, MsgPropsFun, #vqstate { pending_ack = PA, ram_ack_index = RAI } = State) -> @@ -1458,12 +1452,10 @@ msg_from_pending_ack(SeqId, MsgPropsFun, end, {MsgStatus1 #msg_status { msg_props = MsgPropsFun1(MsgProps1) }, State1}. -q3_limit(BPQ) -> - case bpqueue:is_empty(BPQ) of - true -> undefined; - false -> {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} = - bpqueue:out(BPQ), - SeqId +beta_limit(BPQ) -> + case bpqueue:out(BPQ) of + {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} -> SeqId; + {empty, _BPQ} -> undefined end. delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; @@ -1668,9 +1660,7 @@ maybe_deltas_to_betas(State = #vqstate { {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = - betas_from_index_entries(List, TransientThreshold, - fun (SeqId) -> dict:is_key(SeqId, PA) end, - IndexState1), + betas_from_index_entries(List, TransientThreshold, PA, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, case bpqueue:len(Q3a) of 0 -> |