summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-28 16:45:58 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-28 16:45:58 +0100
commitb80dff2227f4159df59bbb9691b96dd2c30e2072 (patch)
tree65d52a4a5972f4bdf4d86d6f6dc665a4563d2df3
parentfd3581c6165e7e6356789f295d4910a6fc0330d3 (diff)
downloadrabbitmq-server-b80dff2227f4159df59bbb9691b96dd2c30e2072.tar.gz
removed peek, and restructured dropwhile to not load message content from disk
-rw-r--r--include/rabbit_backing_queue_spec.hrl5
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_backing_queue.erl3
-rw-r--r--src/rabbit_invariable_queue.erl10
-rw-r--r--src/rabbit_tests.erl31
-rw-r--r--src/rabbit_variable_queue.erl112
6 files changed, 69 insertions, 97 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 6067ac62..f750fbb2 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -56,9 +56,8 @@
(ack_required(), rabbit_types:basic_message(),
rabbit_types:msg_properties(), state()) -> {ack(), state()}).
-spec(dropwhile/2 ::
- (fun ((rabbit_types:basic_message(), rabbit_types:msg_properties())
- -> boolean()), state()) -> state()).
--spec(peek/1 :: (state()) -> {peek_result(), state()}).
+ (fun ((rabbit_types:msg_properties()) -> boolean()), state())
+ -> state()).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/4 ::
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 52663f15..4b4153e0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -411,7 +411,8 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
- #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State),
+ #q{backing_queue = BQ, backing_queue_state = BQS} =
+ drop_expired_messages(State),
IsEmpty = BQ:is_empty(BQS),
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
@@ -598,7 +599,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
Now = timer:now_diff(now(), {0,0,0}),
BQS1 = BQ:dropwhile(
- fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) ->
+ fun (_MsgProperties = #msg_properties{expiry=Expiry}) ->
Now > Expiry
end, BQS),
ensure_ttl_timer(State #q{backing_queue_state = BQS1}).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index eaabc651..4f71c1a8 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -73,9 +73,6 @@ behaviour_info(callbacks) ->
%% returns true and return the new state.
{dropwhile, 2},
- %% Peek at the next message.
- {peek, 1},
-
%% Produce the next message.
{fetch, 2},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index b62544fa..feb7c7e1 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -32,7 +32,7 @@
-module(rabbit_invariable_queue).
-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
- publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, peek/1,
+ publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
@@ -118,17 +118,11 @@ publish_delivered(true, Msg = #basic_message { guid = Guid },
ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
-peek(State = #iv_state { len = 0 }) ->
- {empty, State};
-peek(State = #iv_state { queue = Q}) ->
- {value, {Msg, MsgProps, _IsDelivered}} = queue:peek(Q),
- {{Msg, MsgProps}, State}.
-
dropwhile(_Pred, State = #iv_state { len = 0 }) ->
State;
dropwhile(Pred, State = #iv_state { queue = Q }) ->
{{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
- case Pred(Msg, MsgProps) of
+ case Pred(MsgProps) of
true ->
{_, State1} =
fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 37b0916b..430a79d9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1843,8 +1843,7 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
- fun test_dropwhile/1,
- fun test_peek/1]],
+ fun test_dropwhile/1]],
passed.
test_dropwhile(VQ0) ->
@@ -1862,7 +1861,7 @@ test_dropwhile(VQ0) ->
%% drop the first 5 messages
VQ2 = rabbit_variable_queue:dropwhile(
- fun(_Msg, #msg_properties { expiry = Expiry }) ->
+ fun(#msg_properties { expiry = Expiry }) ->
Expiry =< 5
end, VQ1),
@@ -1878,32 +1877,6 @@ test_dropwhile(VQ0) ->
VQ4.
-test_peek(VQ0) ->
- Expiry = 123,
- Body = <<"test">>,
-
- %% publish message
- VQ1 = rabbit_variable_queue:publish(rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{}, Body),
- #msg_properties{ expiry = Expiry },
- VQ0),
-
- %% take a peek
- {{#basic_message{ content = Content },
- #msg_properties { expiry = Expiry}}, VQ2} =
- rabbit_variable_queue:peek(VQ1),
-
- {_, Body} = rabbit_basic:from_content(Content),
-
- %% should be able to fetch still
- {{_Msg, _, _, _}, VQ3} = rabbit_variable_queue:fetch(false, VQ2),
-
- %% should be empty now
- {empty, VQ4} = rabbit_variable_queue:peek(VQ3),
-
- VQ4.
-
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bf1af596..7d584026 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -34,7 +34,7 @@
-export([init/3, terminate/1, delete_and_terminate/1,
purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1, dropwhile/2, peek/1,
+ requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1]).
@@ -517,53 +517,71 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
in_counter = InCount + 1,
persistent_count = PCount1,
pending_ack = PA1 })}.
-
-peek(State) ->
- internal_queue_out(
- fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps },
- _, State1) ->
- {{Msg, MsgProps}, State1}
- end, State).
-
dropwhile(Pred, State) ->
case internal_queue_out(
- fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps },
- Q4a, State1) ->
- case Pred(Msg, MsgProps) of
+ fun(MsgStatus = #msg_status { msg_properties = MsgProps },
+ State1) ->
+ case Pred(MsgProps) of
true ->
- {_, State2} = internal_fetch(false, Q4a,
- MsgStatus, State1),
+ {_, State2} = internal_fetch(false,
+ MsgStatus, State1),
dropwhile(Pred, State2);
false ->
- State1
+ %% message needs to go back into Q4 (or
+ %% maybe go in for the first time if it was
+ %% loaded from Q3). Also the msg contents
+ %% might not be in RAM, so read them in now
+ {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
+ read_msg(MsgStatus, State1),
+ State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)}
end
end, State) of
- {empty, State2} -> State2;
- State2 -> State2
+ {empty, StateR} -> StateR;
+ StateR -> StateR
end.
fetch(AckRequired, State) ->
internal_queue_out(
- fun(MsgStatus, Q4a, State1) ->
- internal_fetch(AckRequired, Q4a, MsgStatus, State1)
+ fun(MsgStatus, State1) ->
+ %% it's possible that the message wasn't read from disk
+ %% at this point, so read it in.
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ internal_fetch(AckRequired, MsgStatus1, State2)
end, State).
internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
case queue:out(Q4) of
{empty, _Q4} ->
- case fetch_from_q3_to_q4(State) of
- {empty, State1} = Result -> a(State1), Result;
- {loaded, State1} -> internal_queue_out(Fun, State1)
+ case fetch_from_q3(State) of
+ {empty, State1} = Result -> a(State1), Result;
+ {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1)
end;
- {{value, Value}, Q4a} ->
- %% don't automatically overwrite the state with the popped
- %% queue because some callbacks choose to rollback the pop
- %% of the message from the queue
- Fun(Value, Q4a, State)
+ {{value, MsgStatus}, Q4a} ->
+ Fun(MsgStatus, State #vqstate { q4 = Q4a })
end.
-internal_fetch(AckRequired, Q4a,
+read_msg(MsgStatus = #msg_status { msg = undefined,
+ guid = Guid,
+ index_on_disk = IndexOnDisk,
+ is_persistent = IsPersistent },
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
+ msg_store_clients = MSCState}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ read_from_msg_store(MSCState, IsPersistent, Guid),
+
+ RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
+ true = RamIndexCount1 >= 0, %% ASSERTION
+
+ {MsgStatus #msg_status { msg = Msg },
+ State #vqstate { ram_msg_count = RamMsgCount + 1,
+ ram_index_count = RamIndexCount1,
+ msg_store_clients = MSCState1 }};
+read_msg(MsgStatus, State) ->
+ {MsgStatus, State}.
+
+internal_fetch(AckRequired,
MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
@@ -601,8 +619,7 @@ internal_fetch(AckRequired, Q4a,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
- a(State #vqstate { q4 = Q4a,
- ram_msg_count = RamMsgCount - 1,
+ a(State #vqstate { ram_msg_count = RamMsgCount - 1,
out_counter = OutCount + 1,
index_state = IndexState2,
len = Len1,
@@ -1288,40 +1305,31 @@ chunk_size(Current, Permitted)
chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
-fetch_from_q3_to_q4(State = #vqstate {
+fetch_from_q3(State = #vqstate {
q1 = Q1,
q2 = Q2,
delta = #delta { count = DeltaCount },
q3 = Q3,
- q4 = Q4,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
- msg_store_clients = MSCState }) ->
+ q4 = Q4 }) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
{empty, State};
- {{value, IndexOnDisk, MsgStatus = #msg_status {
- msg = undefined, guid = Guid,
- is_persistent = IsPersistent }}, Q3a} ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- read_from_msg_store(MSCState, IsPersistent, Guid),
- Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4),
- RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
- true = RamIndexCount1 >= 0, %% ASSERTION
- State1 = State #vqstate { q3 = Q3a,
- q4 = Q4a,
- ram_msg_count = RamMsgCount + 1,
- ram_index_count = RamIndexCount1,
- msg_store_clients = MSCState1 },
+ {{value, _IndexOnDisk, MsgStatus}, Q3a} ->
+
+ State1 = State #vqstate { q3 = Q3a},
+
State2 =
case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->
%% q3 is now empty, it wasn't before; delta is
- %% still empty. So q2 must be empty, and q1
- %% can now be joined onto q4
+ %% still empty. So q2 must be empty, and we
+ %% know q4 is empty otherwise we wouldn't be
+ %% loading from q3. As such, we can just set
+ %% q4 to Q1.
true = bpqueue:is_empty(Q2), %% ASSERTION
+ true = queue:is_empty(Q4), %% ASSERTION
State1 #vqstate { q1 = queue:new(),
- q4 = queue:join(Q4a, Q1) };
+ q4 = Q1 };
{true, false} ->
maybe_deltas_to_betas(State1);
{false, _} ->
@@ -1330,7 +1338,7 @@ fetch_from_q3_to_q4(State = #vqstate {
%% delta and q3 are maintained
State1
end,
- {loaded, State2}
+ {loaded, {MsgStatus, State2}}
end.
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->