summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-27 16:21:38 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-27 16:21:38 +0100
commit3141b0e9b6edaebf69e02ceacdfec5e45494f23f (patch)
tree2b772efceec2b8f733d9e5062a0b6234b9122262
parent6b640144895f964e81bdd8d6a234fc3cfb17cf7c (diff)
downloadrabbitmq-server-3141b0e9b6edaebf69e02ceacdfec5e45494f23f.tar.gz
added peek to backing queue, implemented in vq
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_backing_queue.erl3
-rw-r--r--src/rabbit_tests.erl58
-rw-r--r--src/rabbit_variable_queue.erl13
4 files changed, 71 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 08ce0ed6..955b607f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -150,7 +150,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
init_queue_state(State) ->
- lists:foldr(fun(F, S) -> F(S) end, State,
+ lists:foldl(fun(F, S) -> F(S) end, State,
[fun init_expires/1, fun init_ttl/1]).
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 4f71c1a8..eaabc651 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -73,6 +73,9 @@ behaviour_info(callbacks) ->
%% returns true and return the new state.
{dropwhile, 2},
+ %% Peek at the next message.
+ {peek, 1},
+
%% Produce the next message.
{fetch, 2},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ee2b564d..bc99af55 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1842,9 +1842,65 @@ test_variable_queue() ->
F <- [fun test_variable_queue_dynamic_duration_change/1,
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
- fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1]],
+ fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
+ fun test_dropwhile/1,
+ fun test_peek/1]],
passed.
+test_dropwhile(VQ0) ->
+ Count = 10,
+
+ %% add messages with sequential expiry
+ VQ1 = lists:foldl(
+ fun (N, VQN) ->
+ rabbit_variable_queue:publish(
+ rabbit_basic:message(
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{}, <<>>),
+ #msg_properties{expiry = N}, VQN)
+ end, VQ0, lists:seq(1, Count)),
+
+ %% drop the first 5 messages
+ VQ2 = rabbit_variable_queue:dropwhile(
+ fun(_Msg, #msg_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, VQ1),
+
+ %% fetch five now
+ VQ3 = lists:foldl(fun (_N, VQN) ->
+ {{#basic_message{}, _, _, _}, VQM} =
+ rabbit_variable_queue:fetch(false, VQN),
+ VQM
+ end, VQ2, lists:seq(1, 5)),
+
+ %% should be empty now
+ {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3),
+
+ VQ4.
+
+test_peek(VQ0) ->
+ Expiry = 123,
+ Body = <<"test">>,
+
+ %% publish message
+ VQ1 = rabbit_variable_queue:publish(rabbit_basic:message(
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{}, Body),
+ #msg_properties{ expiry = Expiry },
+ VQ0),
+
+ %% take a peek
+ {{#basic_message{ content = Content },
+ #msg_properties { expiry = Expiry}}, VQ2} =
+ rabbit_variable_queue:peek(VQ1),
+
+ {_, Body} = rabbit_basic:from_content(Content),
+
+ %% should be able to fetch still
+ {{_Msg, _, _, _}, VQ3} = rabbit_variable_queue:fetch(false, VQ2),
+
+ VQ3.
+
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4df4088c..bf1af596 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -34,7 +34,7 @@
-export([init/3, terminate/1, delete_and_terminate/1,
purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1, dropwhile/2,
+ requeue/3, len/1, is_empty/1, dropwhile/2, peek/1,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1]).
@@ -518,7 +518,14 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
persistent_count = PCount1,
pending_ack = PA1 })}.
-
+peek(State) ->
+ internal_queue_out(
+ fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps },
+ _, State1) ->
+ {{Msg, MsgProps}, State1}
+ end, State).
+
+
dropwhile(Pred, State) ->
case internal_queue_out(
fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps },
@@ -601,7 +608,7 @@ internal_fetch(AckRequired, Q4a,
len = Len1,
persistent_count = PCount1,
pending_ack = PA1 })}.
-
+
ack(AckTags, State) ->
a(ack(fun rabbit_msg_store:remove/2,
fun (_AckEntry, State1) -> State1 end,