diff options
author | Rob Harrop <rharrop@vmware.com> | 2010-09-27 16:21:38 +0100 |
---|---|---|
committer | Rob Harrop <rharrop@vmware.com> | 2010-09-27 16:21:38 +0100 |
commit | 3141b0e9b6edaebf69e02ceacdfec5e45494f23f (patch) | |
tree | 2b772efceec2b8f733d9e5062a0b6234b9122262 | |
parent | 6b640144895f964e81bdd8d6a234fc3cfb17cf7c (diff) | |
download | rabbitmq-server-3141b0e9b6edaebf69e02ceacdfec5e45494f23f.tar.gz |
added peek to backing queue, implemented in vq
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 58 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 13 |
4 files changed, 71 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 08ce0ed6..955b607f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -150,7 +150,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- init_queue_state(State) -> - lists:foldr(fun(F, S) -> F(S) end, State, + lists:foldl(fun(F, S) -> F(S) end, State, [fun init_expires/1, fun init_ttl/1]). init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 4f71c1a8..eaabc651 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -73,6 +73,9 @@ behaviour_info(callbacks) -> %% returns true and return the new state. {dropwhile, 2}, + %% Peek at the next message. + {peek, 1}, + %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ee2b564d..bc99af55 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1842,9 +1842,65 @@ test_variable_queue() -> F <- [fun test_variable_queue_dynamic_duration_change/1, fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, - fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1]], + fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, + fun test_dropwhile/1, + fun test_peek/1]], passed. +test_dropwhile(VQ0) -> + Count = 10, + + %% add messages with sequential expiry + VQ1 = lists:foldl( + fun (N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, <<>>), + #msg_properties{expiry = N}, VQN) + end, VQ0, lists:seq(1, Count)), + + %% drop the first 5 messages + VQ2 = rabbit_variable_queue:dropwhile( + fun(_Msg, #msg_properties { expiry = Expiry }) -> + Expiry =< 5 + end, VQ1), + + %% fetch five now + VQ3 = lists:foldl(fun (_N, VQN) -> + {{#basic_message{}, _, _, _}, VQM} = + rabbit_variable_queue:fetch(false, VQN), + VQM + end, VQ2, lists:seq(1, 5)), + + %% should be empty now + {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), + + VQ4. + +test_peek(VQ0) -> + Expiry = 123, + Body = <<"test">>, + + %% publish message + VQ1 = rabbit_variable_queue:publish(rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, Body), + #msg_properties{ expiry = Expiry }, + VQ0), + + %% take a peek + {{#basic_message{ content = Content }, + #msg_properties { expiry = Expiry}}, VQ2} = + rabbit_variable_queue:peek(VQ1), + + {_, Body} = rabbit_basic:from_content(Content), + + %% should be able to fetch still + {{_Msg, _, _, _}, VQ3} = rabbit_variable_queue:fetch(false, VQ2), + + VQ3. + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4df4088c..bf1af596 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, + requeue/3, len/1, is_empty/1, dropwhile/2, peek/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -518,7 +518,14 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, persistent_count = PCount1, pending_ack = PA1 })}. - +peek(State) -> + internal_queue_out( + fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, + _, State1) -> + {{Msg, MsgProps}, State1} + end, State). + + dropwhile(Pred, State) -> case internal_queue_out( fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, @@ -601,7 +608,7 @@ internal_fetch(AckRequired, Q4a, len = Len1, persistent_count = PCount1, pending_ack = PA1 })}. - + ack(AckTags, State) -> a(ack(fun rabbit_msg_store:remove/2, fun (_AckEntry, State1) -> State1 end, |