diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-11 18:02:04 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-11 18:02:04 +0100 |
commit | df36d772d5a6d6416cec61ade2e85453a2188f37 (patch) | |
tree | 4a70b732b4237dec59e0bb370a56ce386bef1af1 | |
parent | 00096f34cd13fd52a8870d2b329b1eea3aaaebc3 (diff) | |
download | rabbitmq-server-df36d772d5a6d6416cec61ade2e85453a2188f37.tar.gz |
Add peek and peek_r to lqueue, and use them in needs_timeout. Whilst this is better, it's still O(N) per msg, so we shouldn't actually be doing even this...
-rw-r--r-- | src/lqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 18 |
2 files changed, 18 insertions, 10 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl index 4a8164f6..f07eec18 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -17,7 +17,7 @@ -module(lqueue). -export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2, - foldl/3, foldr/3, from_list/1, to_list/1]). + foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]). -define(QUEUE, queue). @@ -42,6 +42,8 @@ -spec(foldr/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B). -spec(from_list/1 :: ([value()]) -> ?MODULE()). -spec(to_list/1 :: (?MODULE()) -> [value()]). +-spec(peek(?MODULE) -> 'empty' | {'value',value()}). +-spec(peek_r(?MODULE) -> 'empty' | {'value',value()}). -endif. @@ -87,3 +89,9 @@ foldr(Fun, Init, Q) -> len({L, _Q}) -> L. + +peek({0, _Q}) -> empty; +peek({_L, Q}) -> ?QUEUE:peek(Q). + +peek_r({0, _Q}) -> empty; +peek_r({_L, Q}) -> ?QUEUE:peek_r(Q). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 60c3dfd2..775a1664 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -724,17 +724,17 @@ needs_timeout(State) -> end. null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) -> - {null_gamma_delta_msg(?QUEUE:out(Q2), ?QUEUE:out(Q2), + {null_gamma_delta_msg(?QUEUE:peek(Q2), ?QUEUE:peek(Q2), fun (SeqId) -> SeqId end) orelse - null_gamma_delta_msg(?QUEUE:out_r(Q3), ?QUEUE:out(Q3), + null_gamma_delta_msg(?QUEUE:peek_r(Q3), ?QUEUE:peek(Q3), fun rabbit_queue_index:next_segment_boundary/1), State}. -null_gamma_delta_msg({{value, #msg_status { seq_id = SeqId1, - index_on_disk = true }}, _Q}, - {{value, #msg_status { seq_id = SeqId2 }}, _Q2}, +null_gamma_delta_msg({value, #msg_status { seq_id = SeqId1, + index_on_disk = true }}, + {value, #msg_status { seq_id = SeqId2 }}, LimitFun) -> - SeqId1 >= LimitFun(SeqId2); + LimitFun =:= undefined orelse SeqId1 >= LimitFun(SeqId2); null_gamma_delta_msg(_, _, _) -> false. @@ -1376,9 +1376,9 @@ msg_from_pending_ack(SeqId, MsgPropsFun, State) -> needs_confirming = false } }, State1}. beta_limit(Q) -> - case ?QUEUE:out(Q) of - {{value, #msg_status { seq_id = SeqId }}, _Q} -> SeqId; - {empty, _Q} -> undefined + case ?QUEUE:peek(Q) of + {value, #msg_status { seq_id = SeqId }} -> SeqId; + empty -> undefined end. delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; |