diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:02:24 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:02:24 +0100 |
commit | 6ffbb36794fe101eaef101127645758ab29b4c44 (patch) | |
tree | d2b6abb0a43d39653a262bd73e6097022b1bb900 | |
parent | 1ebfa127f0ebb80cfe1277decb0f00939e593ca8 (diff) | |
download | rabbitmq-server-6ffbb36794fe101eaef101127645758ab29b4c44.tar.gz |
a whole bunch of refactoring and cosmetic changes
- more sensible order of args to queue_merge and delta_merge:
1) things that get consumed,
2) things that get added to,
3) constants,
4) State
- more sensible order of results of queue_merge and delta_merge:
1) things left over from (1) above,
2) things produced,
4) State
- better var names
- correct indentation
- stick to <80 columns
- some inlining
-rw-r--r-- | src/rabbit_variable_queue.erl | 206 |
1 files changed, 105 insertions, 101 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 66567c0c..a624ea33 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1320,13 +1320,15 @@ requeue_merge(SeqIdsSorted, MsgPropsFun, q4 = Q4, in_counter = InCounter, len = Len } = State) -> - {SeqIds1, MsgIds, Q4a, State1} = queue_merge(SeqIdsSorted, beta_limit(Q3), - Q4, queue:new(), [], + {SeqIds1, Q4a, MsgIds, State1} = queue_merge(SeqIdsSorted, Q4, + queue:new(), [], + beta_limit(Q3), q4_funs(MsgPropsFun), State), - {SeqIds2, MsgIds1, Q3a, State2} = queue_merge(SeqIds1, delta_limit(Delta), - Q3, bpqueue:new(), MsgIds, + {SeqIds2, Q3a, MsgIds1, State2} = queue_merge(SeqIds1, Q3, + bpqueue:new(), MsgIds, + delta_limit(Delta), q3_funs(MsgPropsFun), State1), - {MsgIds2, Delta1, State3} = delta_merge(SeqIds2, MsgIds1, Delta, + {Delta1, MsgIds2, State3} = delta_merge(SeqIds2, Delta, MsgIds1, MsgPropsFun, State2), MsgCount = length(MsgIds2), {MsgIds2, State3 #vqstate { delta = Delta1, @@ -1336,121 +1338,123 @@ requeue_merge(SeqIdsSorted, MsgPropsFun, len = Len + MsgCount }}. %% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, - #merge_funs { out = QOut, - in = QIn, - publish = QPublish } = Funs, State) - when Limit == undefined orelse SeqId < Limit -> +queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, + #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, + State) + when Limit == undefined orelse SeqId < Limit -> case QOut(Q) of - {{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} - when SeqId1 < SeqId -> - %% enqueue from the remaining queue - queue_merge(SeqIds, Limit, Q1, QIn(MsgStatusHead, Front), MsgIds, - Funs, State); + {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} + when SeqIdQ < SeqId -> + %% enqueue from the remaining queue + queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds, + Limit, Funs, State); {_, _Q1} -> - %% enqueue from the remaining list of sequence ids - {#msg_status { msg_id = MsgId } = MsgStatus1, State1} = - QPublish(SeqId, State), - queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front), - [MsgId | MsgIds], Funs, State1) + %% enqueue from the remaining list of sequence ids + {#msg_status { msg_id = MsgId } = MsgStatus, State1} = + QPublish(SeqId, State), + queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds], + Limit, Funs, State1) end; -queue_merge(SeqIds, _Limit, Q, Front, MsgIds, #merge_funs { join = QJoin }, +queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin }, State) -> - {SeqIds, MsgIds, QJoin(Front, Q), State}. + {SeqIds, QJoin(Front, Q), MsgIds, State}. q4_funs(MsgPropsFun) -> #merge_funs { - join = fun queue:join/2, - out = fun queue:out/1, - in = fun queue:in/2, - publish = fun (SeqId, State) -> - {#msg_status { msg = Msg } = MsgStatus, - #vqstate { ram_msg_count = RamMsgCount } = State1} = - msg_from_pending_ack(SeqId, MsgPropsFun, State), - case Msg of - undefined -> - read_msg(MsgStatus, State1); - #basic_message{} -> {MsgStatus, - State1 #vqstate { ram_msg_count = - RamMsgCount + 1}} - end - end}. + join = fun queue:join/2, + out = fun queue:out/1, + in = fun queue:in/2, + publish = fun (SeqId, State) -> + {#msg_status { msg = Msg } = MsgStatus, + #vqstate { ram_msg_count = RamMsgCount } = State1} = + msg_from_pending_ack(SeqId, MsgPropsFun, State), + case Msg of + undefined -> + read_msg(MsgStatus, State1); + #basic_message{} -> + {MsgStatus, State1 #vqstate { + ram_msg_count = + RamMsgCount + 1 }} + end + end}. q3_funs(MsgPropsFun) -> #merge_funs { - join = fun bpqueue:join/2, - out = fun (Q) -> - case bpqueue:out(Q) of - {{value, _IndexOnDisk, MsgStatus}, Q1} -> - {{value, MsgStatus}, Q1}; - {empty, _Q1} = X -> - X - end - end, - in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) -> - bpqueue:in(IOD, MsgStatus, Q) - end, - publish = fun (SeqId, State) -> - {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, - State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State), - {#msg_status { msg = Msg, - index_on_disk = IndexOnDisk1 } = MsgStatus1, - #vqstate { ram_index_count = RamIndexCount, - ram_msg_count = RamMsgCount } = State2} = - maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, - State1), - {MsgStatus1, - State2 #vqstate { - ram_index_count = RamIndexCount + one_if(not IndexOnDisk1), - ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }} - end}. - - -delta_merge([], MsgIds, Delta, _MsgPropsFun, State) -> - {MsgIds, Delta, State}; -delta_merge(SeqIds, MsgIds, #delta { start_seq_id = StartSeqId, - count = Count, - end_seq_id = EndSeqId} = Delta, - MsgPropsFun, State) -> - lists:foldl(fun (SeqId, {MsgIds0, Delta0, State0}) -> - {#msg_status { msg_id = MsgId, - index_on_disk = IndexOnDisk, - msg_on_disk = MsgOnDisk} = MsgStatus, - State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State0), - Delta1 = Delta0 #delta { - start_seq_id = min(SeqId, StartSeqId), - count = Count + 1, - end_seq_id = max(SeqId + 1, EndSeqId) }, - {_MsgStatus, State2} = maybe_write_to_disk( - not MsgOnDisk, not IndexOnDisk, - MsgStatus, State1), - {[MsgId | MsgIds0], Delta1, State2} - end, {MsgIds, Delta, State}, SeqIds). + join = fun bpqueue:join/2, + out = fun (Q) -> + case bpqueue:out(Q) of + {{value, _IndexOnDisk, MsgStatus}, Q1} -> + {{value, MsgStatus}, Q1}; + {empty, _Q1} = X -> + X + end + end, + in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) -> + bpqueue:in(IOD, MsgStatus, Q) + end, + publish = fun (SeqId, State) -> + {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, + State1} = + msg_from_pending_ack(SeqId, MsgPropsFun, State), + {#msg_status { index_on_disk = IndexOnDisk, + msg = Msg} = MsgStatus1, + #vqstate { ram_index_count = RamIndexCount, + ram_msg_count = RamMsgCount } = + State2} = + maybe_write_to_disk(not MsgOnDisk, false, + MsgStatus, State1), + {MsgStatus1, State2 #vqstate { + ram_msg_count = RamMsgCount + + one_if(Msg =/= undefined), + ram_index_count = RamIndexCount + + one_if(not IndexOnDisk) }} + end}. + + +delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> + {Delta, MsgIds, State}; +delta_merge(SeqIds, #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId} = Delta, + MsgIds, MsgPropsFun, State) -> + lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> + {#msg_status { msg_id = MsgId, + index_on_disk = IndexOnDisk, + msg_on_disk = MsgOnDisk} = MsgStatus, + State1} = + msg_from_pending_ack(SeqId, MsgPropsFun, State0), + {_MsgStatus, State2} = + maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk, + MsgStatus, State1), + {Delta0 #delta { + start_seq_id = lists:min([SeqId, StartSeqId]), + count = Count + 1, + end_seq_id = lists:max([SeqId + 1, EndSeqId]) }, + [MsgId | MsgIds0], State2} + end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, MsgPropsFun, #vqstate { pending_ack = PA, ram_ack_index = RAI } = State) -> - MsgPropsFun1 = fun (MsgProps) -> - (MsgPropsFun(MsgProps)) #message_properties { - needs_confirming = false } - end, State1 = State #vqstate { pending_ack = dict:erase(SeqId, PA), ram_ack_index = gb_trees:delete_any(SeqId, RAI)}, #msg_status { msg_props = MsgProps1 } = MsgStatus1 = case dict:fetch(SeqId, PA) of - {IsPersistent, MsgId, MsgProps, IndexOnDisk} -> - #msg_status { seq_id = SeqId, - msg_id = MsgId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = true, - msg_on_disk = true, - index_on_disk = IndexOnDisk, - msg_props = MsgProps }; - #msg_status{} = MsgStatus0 -> MsgStatus0 - end, - {MsgStatus1 #msg_status { msg_props = MsgPropsFun1(MsgProps1) }, State1}. + {IsPersistent, MsgId, MsgProps, IndexOnDisk} -> + #msg_status { seq_id = SeqId, + msg_id = MsgId, + msg = undefined, + is_persistent = IsPersistent, + is_delivered = true, + msg_on_disk = true, + index_on_disk = IndexOnDisk, + msg_props = MsgProps }; + #msg_status{} = MsgStatus0 -> MsgStatus0 + end, + {MsgStatus1 #msg_status { + msg_props = (MsgPropsFun(MsgProps1)) #message_properties { + needs_confirming = false } }, State1}. beta_limit(BPQ) -> case bpqueue:out(BPQ) of |