summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-09-29 17:48:29 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-09-29 17:48:29 +0100
commit1ebfa127f0ebb80cfe1277decb0f00939e593ca8 (patch)
tree6bac96a83e81d26d39ba5544000f320ebac4c8cf
parent854407a2dbc5c3d129cca4983bc10b06e37c2ffe (diff)
downloadrabbitmq-server-1ebfa127f0ebb80cfe1277decb0f00939e593ca8.tar.gz
Refactor, rename, comment
-rw-r--r--src/rabbit_variable_queue.erl52
1 files changed, 21 insertions, 31 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0ef0043e..66567c0c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -573,7 +573,7 @@ ack(AckTags, State) ->
requeue(AckTags, MsgPropsFun, State) ->
{MsgIds, State1} = requeue_merge(lists:sort(AckTags), MsgPropsFun, State),
- {MsgIds, reduce_memory_use(State1)}.
+ {MsgIds, a(reduce_memory_use(State1))}.
len(#vqstate { len = Len }) -> Len.
@@ -823,7 +823,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, AckedFun, IndexState) ->
+betas_from_index_entries(List, TransientThreshold, PendingAcks, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
@@ -832,7 +832,7 @@ betas_from_index_entries(List, TransientThreshold, AckedFun, IndexState) ->
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> case AckedFun(SeqId) of
+ false -> case dict:is_key(SeqId, PendingAcks) of
false -> {[m(#msg_status {
msg = undefined,
msg_id = MsgId,
@@ -1319,10 +1319,10 @@ requeue_merge(SeqIdsSorted, MsgPropsFun,
q3 = Q3,
q4 = Q4,
in_counter = InCounter,
- len = Len } = State) ->
- {SeqIds1, MsgIds, Q4a, State1} = queue_merge(SeqIdsSorted, q3_limit(Q3),
- Q4, queue:new(), [],
- q4_funs(MsgPropsFun), State),
+ len = Len } = State) ->
+ {SeqIds1, MsgIds, Q4a, State1} = queue_merge(SeqIdsSorted, beta_limit(Q3),
+ Q4, queue:new(), [],
+ q4_funs(MsgPropsFun), State),
{SeqIds2, MsgIds1, Q3a, State2} = queue_merge(SeqIds1, delta_limit(Delta),
Q3, bpqueue:new(), MsgIds,
q3_funs(MsgPropsFun), State1),
@@ -1335,26 +1335,20 @@ requeue_merge(SeqIdsSorted, MsgPropsFun,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }}.
-queue_merge([], _Limit, Q, Front, MsgIds, #merge_funs { join = QJoin },
- State) ->
- {[], MsgIds, QJoin(Front, Q), State};
+%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds,
#merge_funs { out = QOut,
in = QIn,
publish = QPublish } = Funs, State)
when Limit == undefined orelse SeqId < Limit ->
case QOut(Q) of
- {{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} ->
- case SeqId1 > SeqId of
- true -> {#msg_status { msg_id = MsgId } = MsgStatus1, State1} =
- QPublish(SeqId, State),
- queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front),
- [MsgId | MsgIds], Funs, State1);
- false -> queue_merge(SeqIds, Limit, Q1,
- QIn(MsgStatusHead, Front), MsgIds,
- Funs, State)
- end;
- {empty, _Q1} ->
+ {{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1}
+ when SeqId1 < SeqId ->
+ %% enqueue from the remaining queue
+ queue_merge(SeqIds, Limit, Q1, QIn(MsgStatusHead, Front), MsgIds,
+ Funs, State);
+ {_, _Q1} ->
+ %% enqueue from the remaining list of sequence ids
{#msg_status { msg_id = MsgId } = MsgStatus1, State1} =
QPublish(SeqId, State),
queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front),
@@ -1433,7 +1427,7 @@ delta_merge(SeqIds, MsgIds, #delta { start_seq_id = StartSeqId,
{[MsgId | MsgIds0], Delta1, State2}
end, {MsgIds, Delta, State}, SeqIds).
-% Mostly opposite of record_pending_ack/2
+%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, MsgPropsFun,
#vqstate { pending_ack = PA,
ram_ack_index = RAI } = State) ->
@@ -1458,12 +1452,10 @@ msg_from_pending_ack(SeqId, MsgPropsFun,
end,
{MsgStatus1 #msg_status { msg_props = MsgPropsFun1(MsgProps1) }, State1}.
-q3_limit(BPQ) ->
- case bpqueue:is_empty(BPQ) of
- true -> undefined;
- false -> {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} =
- bpqueue:out(BPQ),
- SeqId
+beta_limit(BPQ) ->
+ case bpqueue:out(BPQ) of
+ {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} -> SeqId;
+ {empty, _BPQ} -> undefined
end.
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
@@ -1668,9 +1660,7 @@ maybe_deltas_to_betas(State = #vqstate {
{List, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} =
- betas_from_index_entries(List, TransientThreshold,
- fun (SeqId) -> dict:is_key(SeqId, PA) end,
- IndexState1),
+ betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
case bpqueue:len(Q3a) of
0 ->