summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-11-23 12:28:48 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-11-23 12:28:48 +0000
commit6cddbaf0135c269e2d3be5d9b7c956cd2fcd60f4 (patch)
tree658b243959d244e17cb72881469f711539555c45
parentacd1ab8fafab77b4a93222cd8f6dce354fe1b7d8 (diff)
downloadrabbitmq-server-6cddbaf0135c269e2d3be5d9b7c956cd2fcd60f4.tar.gz
Refactor backing queue delta fold
-rw-r--r--src/rabbit_variable_queue.erl71
1 files changed, 32 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f49aa085..10ada2dd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -680,51 +680,22 @@ requeue(AckTags, #vqstate { delta = Delta,
fold(Fun, Acc, #vqstate { q1 = Q1,
q2 = Q2,
- delta = Delta,
+ delta = #delta { start_seq_id = DeltaSeqId,
+ end_seq_id = DeltaSeqIdEnd },
q3 = Q3,
- q4 = Q4} = State) ->
- QFun = fun(M, {A, S}) ->
- {#msg_status{msg = Msg}, State1} = read_msg(M, false, S),
- A1 = Fun(Msg, A),
- {A1, State1}
+ q4 = Q4 } = State) ->
+ QFun = fun(MsgStatus, {Acc0, State0}) ->
+ {#msg_status { msg = Msg }, State1 } =
+ read_msg(MsgStatus, false, State0),
+ Acc1 = Fun(Msg, Acc0),
+ {Acc1, State1}
end,
{Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
{Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
- {Acc3, State3} = delta_fold (Fun, Acc2, Delta, State2),
+ {Acc3, State3} = delta_fold (Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2),
{Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2),
?QUEUE:foldl(QFun, {Acc4, State4}, Q1).
-delta_fold(_Fun, Acc, ?BLANK_DELTA_PATTERN(X), State) ->
- {Acc, State};
-delta_fold(Fun, Acc, #delta { start_seq_id = DeltaSeqId,
- end_seq_id = DeltaSeqIdEnd}, State) ->
- {List, State1 = #vqstate { msg_store_clients = MSCState }} =
- delta_index(DeltaSeqId, DeltaSeqIdEnd, State),
- {Result, MSCState3} =
- lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent, _IsDelivered},
- {Acc1, MSCState1}) ->
- {{ok, Msg = #basic_message {}}, MSCState2} =
- msg_store_read(MSCState1, IsPersistent, MsgId),
- {Fun(Msg, Acc1), MSCState2}
- end, {Acc, MSCState}, List),
- {Result, State1 #vqstate { msg_store_clients = MSCState3}}.
-
-delta_index(DeltaSeqId, DeltaSeqIdEnd, State) ->
- delta_index(DeltaSeqId, DeltaSeqIdEnd, State, []).
-
-delta_index(DeltaSeqIdDone, DeltaSeqIdEnd, State, List)
- when DeltaSeqIdDone == DeltaSeqIdEnd ->
- {List, State};
-delta_index(DeltaSeqIdDone, DeltaSeqIdEnd,
- #vqstate { index_state = IndexState } = State, List) ->
- DeltaSeqId1 = lists:min(
- [rabbit_queue_index:next_segment_boundary(DeltaSeqIdDone),
- DeltaSeqIdEnd]),
- {List1, IndexState1} =
- rabbit_queue_index:read(DeltaSeqIdDone, DeltaSeqId1, IndexState),
- delta_index(DeltaSeqId1, DeltaSeqIdEnd,
- State #vqstate { index_state = IndexState1 }, List ++ List1).
-
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
@@ -1402,7 +1373,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue
+%% Internal plumbing for requeue and fold
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
@@ -1471,6 +1442,28 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+delta_fold(_Fun, Acc, DeltaSeqId, DeltaSeqIdEnd, State)
+ when DeltaSeqId == DeltaSeqIdEnd ->
+ {Acc, State};
+delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
+ #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState } = State) ->
+ DeltaSeqId1 = lists:min(
+ [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Acc1, MSCState1} =
+ lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent,
+ _IsDelivered}, {Acc0, MSCState0}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent, MsgId),
+ {Fun(Msg, Acc0), MSCState1}
+ end, {Acc, MSCState}, List),
+ delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd,
+ State #vqstate { index_state = IndexState1,
+ msg_store_clients = MSCState1 }).
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------