summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:02:24 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:02:24 +0100
commit6ffbb36794fe101eaef101127645758ab29b4c44 (patch)
treed2b6abb0a43d39653a262bd73e6097022b1bb900
parent1ebfa127f0ebb80cfe1277decb0f00939e593ca8 (diff)
downloadrabbitmq-server-6ffbb36794fe101eaef101127645758ab29b4c44.tar.gz
a whole bunch of refactoring and cosmetic changes
- more sensible order of args to queue_merge and delta_merge: 1) things that get consumed, 2) things that get added to, 3) constants, 4) State - more sensible order of results of queue_merge and delta_merge: 1) things left over from (1) above, 2) things produced, 4) State - better var names - correct indentation - stick to <80 columns - some inlining
-rw-r--r--src/rabbit_variable_queue.erl206
1 files changed, 105 insertions, 101 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 66567c0c..a624ea33 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1320,13 +1320,15 @@ requeue_merge(SeqIdsSorted, MsgPropsFun,
q4 = Q4,
in_counter = InCounter,
len = Len } = State) ->
- {SeqIds1, MsgIds, Q4a, State1} = queue_merge(SeqIdsSorted, beta_limit(Q3),
- Q4, queue:new(), [],
+ {SeqIds1, Q4a, MsgIds, State1} = queue_merge(SeqIdsSorted, Q4,
+ queue:new(), [],
+ beta_limit(Q3),
q4_funs(MsgPropsFun), State),
- {SeqIds2, MsgIds1, Q3a, State2} = queue_merge(SeqIds1, delta_limit(Delta),
- Q3, bpqueue:new(), MsgIds,
+ {SeqIds2, Q3a, MsgIds1, State2} = queue_merge(SeqIds1, Q3,
+ bpqueue:new(), MsgIds,
+ delta_limit(Delta),
q3_funs(MsgPropsFun), State1),
- {MsgIds2, Delta1, State3} = delta_merge(SeqIds2, MsgIds1, Delta,
+ {Delta1, MsgIds2, State3} = delta_merge(SeqIds2, Delta, MsgIds1,
MsgPropsFun, State2),
MsgCount = length(MsgIds2),
{MsgIds2, State3 #vqstate { delta = Delta1,
@@ -1336,121 +1338,123 @@ requeue_merge(SeqIdsSorted, MsgPropsFun,
len = Len + MsgCount }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
-queue_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds,
- #merge_funs { out = QOut,
- in = QIn,
- publish = QPublish } = Funs, State)
- when Limit == undefined orelse SeqId < Limit ->
+queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
+ #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
+ State)
+ when Limit == undefined orelse SeqId < Limit ->
case QOut(Q) of
- {{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1}
- when SeqId1 < SeqId ->
- %% enqueue from the remaining queue
- queue_merge(SeqIds, Limit, Q1, QIn(MsgStatusHead, Front), MsgIds,
- Funs, State);
+ {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
+ when SeqIdQ < SeqId ->
+ %% enqueue from the remaining queue
+ queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
+ Limit, Funs, State);
{_, _Q1} ->
- %% enqueue from the remaining list of sequence ids
- {#msg_status { msg_id = MsgId } = MsgStatus1, State1} =
- QPublish(SeqId, State),
- queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front),
- [MsgId | MsgIds], Funs, State1)
+ %% enqueue from the remaining list of sequence ids
+ {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
+ QPublish(SeqId, State),
+ queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds],
+ Limit, Funs, State1)
end;
-queue_merge(SeqIds, _Limit, Q, Front, MsgIds, #merge_funs { join = QJoin },
+queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
State) ->
- {SeqIds, MsgIds, QJoin(Front, Q), State}.
+ {SeqIds, QJoin(Front, Q), MsgIds, State}.
q4_funs(MsgPropsFun) ->
#merge_funs {
- join = fun queue:join/2,
- out = fun queue:out/1,
- in = fun queue:in/2,
- publish = fun (SeqId, State) ->
- {#msg_status { msg = Msg } = MsgStatus,
- #vqstate { ram_msg_count = RamMsgCount } = State1} =
- msg_from_pending_ack(SeqId, MsgPropsFun, State),
- case Msg of
- undefined ->
- read_msg(MsgStatus, State1);
- #basic_message{} -> {MsgStatus,
- State1 #vqstate { ram_msg_count =
- RamMsgCount + 1}}
- end
- end}.
+ join = fun queue:join/2,
+ out = fun queue:out/1,
+ in = fun queue:in/2,
+ publish = fun (SeqId, State) ->
+ {#msg_status { msg = Msg } = MsgStatus,
+ #vqstate { ram_msg_count = RamMsgCount } = State1} =
+ msg_from_pending_ack(SeqId, MsgPropsFun, State),
+ case Msg of
+ undefined ->
+ read_msg(MsgStatus, State1);
+ #basic_message{} ->
+ {MsgStatus, State1 #vqstate {
+ ram_msg_count =
+ RamMsgCount + 1 }}
+ end
+ end}.
q3_funs(MsgPropsFun) ->
#merge_funs {
- join = fun bpqueue:join/2,
- out = fun (Q) ->
- case bpqueue:out(Q) of
- {{value, _IndexOnDisk, MsgStatus}, Q1} ->
- {{value, MsgStatus}, Q1};
- {empty, _Q1} = X ->
- X
- end
- end,
- in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) ->
- bpqueue:in(IOD, MsgStatus, Q)
- end,
- publish = fun (SeqId, State) ->
- {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
- State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State),
- {#msg_status { msg = Msg,
- index_on_disk = IndexOnDisk1 } = MsgStatus1,
- #vqstate { ram_index_count = RamIndexCount,
- ram_msg_count = RamMsgCount } = State2} =
- maybe_write_to_disk(not MsgOnDisk, false, MsgStatus,
- State1),
- {MsgStatus1,
- State2 #vqstate {
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk1),
- ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}
- end}.
-
-
-delta_merge([], MsgIds, Delta, _MsgPropsFun, State) ->
- {MsgIds, Delta, State};
-delta_merge(SeqIds, MsgIds, #delta { start_seq_id = StartSeqId,
- count = Count,
- end_seq_id = EndSeqId} = Delta,
- MsgPropsFun, State) ->
- lists:foldl(fun (SeqId, {MsgIds0, Delta0, State0}) ->
- {#msg_status { msg_id = MsgId,
- index_on_disk = IndexOnDisk,
- msg_on_disk = MsgOnDisk} = MsgStatus,
- State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State0),
- Delta1 = Delta0 #delta {
- start_seq_id = min(SeqId, StartSeqId),
- count = Count + 1,
- end_seq_id = max(SeqId + 1, EndSeqId) },
- {_MsgStatus, State2} = maybe_write_to_disk(
- not MsgOnDisk, not IndexOnDisk,
- MsgStatus, State1),
- {[MsgId | MsgIds0], Delta1, State2}
- end, {MsgIds, Delta, State}, SeqIds).
+ join = fun bpqueue:join/2,
+ out = fun (Q) ->
+ case bpqueue:out(Q) of
+ {{value, _IndexOnDisk, MsgStatus}, Q1} ->
+ {{value, MsgStatus}, Q1};
+ {empty, _Q1} = X ->
+ X
+ end
+ end,
+ in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) ->
+ bpqueue:in(IOD, MsgStatus, Q)
+ end,
+ publish = fun (SeqId, State) ->
+ {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
+ State1} =
+ msg_from_pending_ack(SeqId, MsgPropsFun, State),
+ {#msg_status { index_on_disk = IndexOnDisk,
+ msg = Msg} = MsgStatus1,
+ #vqstate { ram_index_count = RamIndexCount,
+ ram_msg_count = RamMsgCount } =
+ State2} =
+ maybe_write_to_disk(not MsgOnDisk, false,
+ MsgStatus, State1),
+ {MsgStatus1, State2 #vqstate {
+ ram_msg_count = RamMsgCount +
+ one_if(Msg =/= undefined),
+ ram_index_count = RamIndexCount +
+ one_if(not IndexOnDisk) }}
+ end}.
+
+
+delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
+ {Delta, MsgIds, State};
+delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId} = Delta,
+ MsgIds, MsgPropsFun, State) ->
+ lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
+ {#msg_status { msg_id = MsgId,
+ index_on_disk = IndexOnDisk,
+ msg_on_disk = MsgOnDisk} = MsgStatus,
+ State1} =
+ msg_from_pending_ack(SeqId, MsgPropsFun, State0),
+ {_MsgStatus, State2} =
+ maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk,
+ MsgStatus, State1),
+ {Delta0 #delta {
+ start_seq_id = lists:min([SeqId, StartSeqId]),
+ count = Count + 1,
+ end_seq_id = lists:max([SeqId + 1, EndSeqId]) },
+ [MsgId | MsgIds0], State2}
+ end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, MsgPropsFun,
#vqstate { pending_ack = PA,
ram_ack_index = RAI } = State) ->
- MsgPropsFun1 = fun (MsgProps) ->
- (MsgPropsFun(MsgProps)) #message_properties {
- needs_confirming = false }
- end,
State1 = State #vqstate { pending_ack = dict:erase(SeqId, PA),
ram_ack_index = gb_trees:delete_any(SeqId, RAI)},
#msg_status { msg_props = MsgProps1 } = MsgStatus1 =
case dict:fetch(SeqId, PA) of
- {IsPersistent, MsgId, MsgProps, IndexOnDisk} ->
- #msg_status { seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = true,
- msg_on_disk = true,
- index_on_disk = IndexOnDisk,
- msg_props = MsgProps };
- #msg_status{} = MsgStatus0 -> MsgStatus0
- end,
- {MsgStatus1 #msg_status { msg_props = MsgPropsFun1(MsgProps1) }, State1}.
+ {IsPersistent, MsgId, MsgProps, IndexOnDisk} ->
+ #msg_status { seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = true,
+ msg_on_disk = true,
+ index_on_disk = IndexOnDisk,
+ msg_props = MsgProps };
+ #msg_status{} = MsgStatus0 -> MsgStatus0
+ end,
+ {MsgStatus1 #msg_status {
+ msg_props = (MsgPropsFun(MsgProps1)) #message_properties {
+ needs_confirming = false } }, State1}.
beta_limit(BPQ) ->
case bpqueue:out(BPQ) of