summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-11 18:02:04 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-11 18:02:04 +0100
commitdf36d772d5a6d6416cec61ade2e85453a2188f37 (patch)
tree4a70b732b4237dec59e0bb370a56ce386bef1af1
parent00096f34cd13fd52a8870d2b329b1eea3aaaebc3 (diff)
downloadrabbitmq-server-df36d772d5a6d6416cec61ade2e85453a2188f37.tar.gz
Add peek and peek_r to lqueue, and use them in needs_timeout. Whilst this is better, it's still O(N) per msg, so we shouldn't actually be doing even this...
-rw-r--r--src/lqueue.erl10
-rw-r--r--src/rabbit_variable_queue.erl18
2 files changed, 18 insertions, 10 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 4a8164f6..f07eec18 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -17,7 +17,7 @@
-module(lqueue).
-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2,
- foldl/3, foldr/3, from_list/1, to_list/1]).
+ foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]).
-define(QUEUE, queue).
@@ -42,6 +42,8 @@
-spec(foldr/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B).
-spec(from_list/1 :: ([value()]) -> ?MODULE()).
-spec(to_list/1 :: (?MODULE()) -> [value()]).
+-spec(peek(?MODULE) -> 'empty' | {'value',value()}).
+-spec(peek_r(?MODULE) -> 'empty' | {'value',value()}).
-endif.
@@ -87,3 +89,9 @@ foldr(Fun, Init, Q) ->
len({L, _Q}) ->
L.
+
+peek({0, _Q}) -> empty;
+peek({_L, Q}) -> ?QUEUE:peek(Q).
+
+peek_r({0, _Q}) -> empty;
+peek_r({_L, Q}) -> ?QUEUE:peek_r(Q).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 60c3dfd2..775a1664 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -724,17 +724,17 @@ needs_timeout(State) ->
end.
null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) ->
- {null_gamma_delta_msg(?QUEUE:out(Q2), ?QUEUE:out(Q2),
+ {null_gamma_delta_msg(?QUEUE:peek(Q2), ?QUEUE:peek(Q2),
fun (SeqId) -> SeqId end) orelse
- null_gamma_delta_msg(?QUEUE:out_r(Q3), ?QUEUE:out(Q3),
+ null_gamma_delta_msg(?QUEUE:peek_r(Q3), ?QUEUE:peek(Q3),
fun rabbit_queue_index:next_segment_boundary/1),
State}.
-null_gamma_delta_msg({{value, #msg_status { seq_id = SeqId1,
- index_on_disk = true }}, _Q},
- {{value, #msg_status { seq_id = SeqId2 }}, _Q2},
+null_gamma_delta_msg({value, #msg_status { seq_id = SeqId1,
+ index_on_disk = true }},
+ {value, #msg_status { seq_id = SeqId2 }},
LimitFun) ->
- SeqId1 >= LimitFun(SeqId2);
+ LimitFun =:= undefined orelse SeqId1 >= LimitFun(SeqId2);
null_gamma_delta_msg(_, _, _) ->
false.
@@ -1376,9 +1376,9 @@ msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
needs_confirming = false } }, State1}.
beta_limit(Q) ->
- case ?QUEUE:out(Q) of
- {{value, #msg_status { seq_id = SeqId }}, _Q} -> SeqId;
- {empty, _Q} -> undefined
+ case ?QUEUE:peek(Q) of
+ {value, #msg_status { seq_id = SeqId }} -> SeqId;
+ empty -> undefined
end.
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;