diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-11-23 12:28:48 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-11-23 12:28:48 +0000 |
commit | 6cddbaf0135c269e2d3be5d9b7c956cd2fcd60f4 (patch) | |
tree | 658b243959d244e17cb72881469f711539555c45 | |
parent | acd1ab8fafab77b4a93222cd8f6dce354fe1b7d8 (diff) | |
download | rabbitmq-server-6cddbaf0135c269e2d3be5d9b7c956cd2fcd60f4.tar.gz |
Refactor backing queue delta fold
-rw-r--r-- | src/rabbit_variable_queue.erl | 71 |
1 files changed, 32 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f49aa085..10ada2dd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -680,51 +680,22 @@ requeue(AckTags, #vqstate { delta = Delta, fold(Fun, Acc, #vqstate { q1 = Q1, q2 = Q2, - delta = Delta, + delta = #delta { start_seq_id = DeltaSeqId, + end_seq_id = DeltaSeqIdEnd }, q3 = Q3, - q4 = Q4} = State) -> - QFun = fun(M, {A, S}) -> - {#msg_status{msg = Msg}, State1} = read_msg(M, false, S), - A1 = Fun(Msg, A), - {A1, State1} + q4 = Q4 } = State) -> + QFun = fun(MsgStatus, {Acc0, State0}) -> + {#msg_status { msg = Msg }, State1 } = + read_msg(MsgStatus, false, State0), + Acc1 = Fun(Msg, Acc0), + {Acc1, State1} end, {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4), {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3), - {Acc3, State3} = delta_fold (Fun, Acc2, Delta, State2), + {Acc3, State3} = delta_fold (Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2), {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2), ?QUEUE:foldl(QFun, {Acc4, State4}, Q1). -delta_fold(_Fun, Acc, ?BLANK_DELTA_PATTERN(X), State) -> - {Acc, State}; -delta_fold(Fun, Acc, #delta { start_seq_id = DeltaSeqId, - end_seq_id = DeltaSeqIdEnd}, State) -> - {List, State1 = #vqstate { msg_store_clients = MSCState }} = - delta_index(DeltaSeqId, DeltaSeqIdEnd, State), - {Result, MSCState3} = - lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent, _IsDelivered}, - {Acc1, MSCState1}) -> - {{ok, Msg = #basic_message {}}, MSCState2} = - msg_store_read(MSCState1, IsPersistent, MsgId), - {Fun(Msg, Acc1), MSCState2} - end, {Acc, MSCState}, List), - {Result, State1 #vqstate { msg_store_clients = MSCState3}}. - -delta_index(DeltaSeqId, DeltaSeqIdEnd, State) -> - delta_index(DeltaSeqId, DeltaSeqIdEnd, State, []). - -delta_index(DeltaSeqIdDone, DeltaSeqIdEnd, State, List) - when DeltaSeqIdDone == DeltaSeqIdEnd -> - {List, State}; -delta_index(DeltaSeqIdDone, DeltaSeqIdEnd, - #vqstate { index_state = IndexState } = State, List) -> - DeltaSeqId1 = lists:min( - [rabbit_queue_index:next_segment_boundary(DeltaSeqIdDone), - DeltaSeqIdEnd]), - {List1, IndexState1} = - rabbit_queue_index:read(DeltaSeqIdDone, DeltaSeqId1, IndexState), - delta_index(DeltaSeqId1, DeltaSeqIdEnd, - State #vqstate { index_state = IndexState1 }, List ++ List1). - len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). @@ -1402,7 +1373,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- -%% Internal plumbing for requeue +%% Internal plumbing for requeue and fold %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> @@ -1471,6 +1442,28 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. +delta_fold(_Fun, Acc, DeltaSeqId, DeltaSeqIdEnd, State) + when DeltaSeqId == DeltaSeqIdEnd -> + {Acc, State}; +delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd, + #vqstate { index_state = IndexState, + msg_store_clients = MSCState } = State) -> + DeltaSeqId1 = lists:min( + [rabbit_queue_index:next_segment_boundary(DeltaSeqId), + DeltaSeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, + IndexState), + {Acc1, MSCState1} = + lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent, + _IsDelivered}, {Acc0, MSCState0}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState0, IsPersistent, MsgId), + {Fun(Msg, Acc0), MSCState1} + end, {Acc, MSCState}, List), + delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd, + State #vqstate { index_state = IndexState1, + msg_store_clients = MSCState1 }). + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |