diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-12 11:35:24 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-12 11:35:24 +0000 |
commit | 02f75e388797b9efb9b1535667e60904f72ab9ed (patch) | |
tree | fae8c76e8680e4a1eae9892fbe6ed76030471afa | |
parent | 4ac412f7ca65af2f328119986fee85778688b57e (diff) | |
download | rabbitmq-server-02f75e388797b9efb9b1535667e60904f72ab9ed.tar.gz |
pass State to iterator
We want to be able to zip this iterator with other iterators that also
manipulate the vqstate. Hence we must pass the State explicitly rather
than keeping it opaque inside the iterator state.
Also, some refactoring on read_msg.
-rw-r--r-- | src/rabbit_variable_queue.erl | 94 |
1 files changed, 43 insertions, 51 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac6a50af..8cb5da0b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -676,7 +676,7 @@ ackfold(MsgFun, Acc, State, AckTags) -> end, {Acc, State}, AckTags), {AccN, a(StateN)}. -fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State)). +fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State), State). len(#vqstate { len = Len }) -> Len. @@ -1080,14 +1080,16 @@ queue_out(State = #vqstate { q4 = Q4 }) -> read_msg(#msg_status{msg = undefined, msg_id = MsgId, - is_persistent = IsPersistent}, - State = #vqstate{msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate {msg_store_clients = MSCState1}}; + is_persistent = IsPersistent}, State) -> + read_msg(MsgId, IsPersistent, State); read_msg(#msg_status{msg = Msg}, State) -> {Msg, State}. +read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, State #vqstate {msg_store_clients = MSCState1}}. + inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> State#vqstate{ram_msg_count = RamMsgCount + 1}. @@ -1442,57 +1444,47 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %% Iterator %%---------------------------------------------------------------------------- -iterator(State = #vqstate{q4 = Q4}) -> {q4, Q4, State}. - -next({q4, _, State} = It) -> next(It, q3, State#vqstate.q3); -next({q3, _, State} = It) -> next(It, delta, State#vqstate.delta); -next({delta, _, State} = It) -> next(It, q2, State#vqstate.q2); -next({q2, _, State} = It) -> next(It, q1, State#vqstate.q1); -next({q1, _, State} = It) -> next(It, done, State); -next({done, _, State}) -> {empty, State}. - -next({delta, #delta{start_seq_id = DeltaSeqId, end_seq_id = DeltaSeqId}, State}, - NextKey, Next) -> - next({NextKey, Next, State}); -next({delta, Delta = #delta{start_seq_id = DeltaSeqId, - end_seq_id = DeltaSeqIdEnd}, - State = #vqstate{index_state = IndexState}}, NextKey, Next) -> - DeltaSeqId1 = lists:min( - [rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), - next({delta, {Delta#delta{start_seq_id = DeltaSeqId1}, List}, - State#vqstate{index_state = IndexState1}}, NextKey, Next); -next({delta, {Delta, []}, State}, NextKey, Next) -> - next({delta, Delta, State}, NextKey, Next); -next({delta, {Delta, [M | Rest]}, - State = #vqstate{msg_store_clients = MSCState}}, _NextKey, _Next) -> +iterator(State) -> istate(start, State). + +istate(start, State) -> {q4, State#vqstate.q4}; +istate(q4, State) -> {q3, State#vqstate.q3}; +istate(q3, State) -> {delta, State#vqstate.delta}; +istate(delta, State) -> {q2, State#vqstate.q2}; +istate(q2, State) -> {q1, State#vqstate.q1}; +istate(q1, _State) -> done. + +next(done, State) -> {empty, State}; +next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqId}}, State) -> + next(istate(delta, State), State); +next({delta, Delta = #delta{start_seq_id = SeqId, end_seq_id = SeqIdEnd}}, + State = #vqstate{index_state = IndexState}) -> + SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), + SeqId1 = lists:min([SeqIdB, SeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), + next({delta, Delta#delta{start_seq_id = SeqId1}, List}, + State#vqstate{index_state = IndexState1}); +next({delta, Delta, []}, State) -> next({delta, Delta}, State); +next({delta, Delta, [M | Rest]}, State) -> {MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered} = M, - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - State1 = State#vqstate{msg_store_clients = MSCState1}, - {value, Msg, MsgProps, {delta, {Delta, Rest}, State1}}; -next({Key, Q, State}, NextKey, Next) -> + {Msg, State1} = read_msg(MsgId, IsPersistent, State), + {value, Msg, MsgProps, {delta, Delta, Rest}, State1}; +next({Key, Q}, State) -> case ?QUEUE:out(Q) of - {empty, _Q} -> - next({NextKey, Next, State}); - {{value, MsgStatus}, QN} -> - {Msg, State1} = read_msg(MsgStatus, State), - {value, Msg, MsgStatus#msg_status.msg_props, {Key, QN, State1}} + {empty, _Q} -> next(istate(Key, State), State); + {{value, MsgStatus}, QN} -> {Msg, State1} = read_msg(MsgStatus, State), + MsgProps = MsgStatus#msg_status.msg_props, + {value, Msg, MsgProps, {Key, QN}, State1} end. -done({_, _, State}) -> State. - -ifold(Fun, Acc, It) -> - case next(It) of - {value, Msg, MsgProps, Next} -> +ifold(Fun, Acc, It, State) -> + case next(It, State) of + {value, Msg, MsgProps, Next, State1} -> case Fun(Msg, MsgProps, Acc) of - {stop, Acc1} -> {Acc1, done(Next)}; - {cont, Acc1} -> ifold(Fun, Acc1, Next) + {stop, Acc1} -> {Acc1, State1}; + {cont, Acc1} -> ifold(Fun, Acc1, Next, State1) end; - {empty, Done} -> - {Acc, Done} + {empty, State1} -> + {Acc, State1} end. %%---------------------------------------------------------------------------- |