diff options
author | Rob Harrop <rharrop@vmware.com> | 2010-09-28 16:45:58 +0100 |
---|---|---|
committer | Rob Harrop <rharrop@vmware.com> | 2010-09-28 16:45:58 +0100 |
commit | b80dff2227f4159df59bbb9691b96dd2c30e2072 (patch) | |
tree | 65d52a4a5972f4bdf4d86d6f6dc665a4563d2df3 | |
parent | fd3581c6165e7e6356789f295d4910a6fc0330d3 (diff) | |
download | rabbitmq-server-b80dff2227f4159df59bbb9691b96dd2c30e2072.tar.gz |
removed peek, and restructured dropwhile to not load message content from disk
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_invariable_queue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 31 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 112 |
6 files changed, 69 insertions, 97 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 6067ac62..f750fbb2 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -56,9 +56,8 @@ (ack_required(), rabbit_types:basic_message(), rabbit_types:msg_properties(), state()) -> {ack(), state()}). -spec(dropwhile/2 :: - (fun ((rabbit_types:basic_message(), rabbit_types:msg_properties()) - -> boolean()), state()) -> state()). --spec(peek/1 :: (state()) -> {peek_result(), state()}). + (fun ((rabbit_types:msg_properties()) -> boolean()), state()) + -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 52663f15..4b4153e0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -411,7 +411,8 @@ deliver_from_queue_deliver(AckRequired, false, State) -> run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, - #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), + #q{backing_queue = BQ, backing_queue_state = BQS} = + drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. @@ -598,7 +599,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> Now = timer:now_diff(now(), {0,0,0}), BQS1 = BQ:dropwhile( - fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> + fun (_MsgProperties = #msg_properties{expiry=Expiry}) -> Now > Expiry end, BQS), ensure_ttl_timer(State #q{backing_queue_state = BQS1}). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index eaabc651..4f71c1a8 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -73,9 +73,6 @@ behaviour_info(callbacks) -> %% returns true and return the new state. {dropwhile, 2}, - %% Peek at the next message. - {peek, 1}, - %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index b62544fa..feb7c7e1 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_invariable_queue). -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, peek/1, + publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -118,17 +118,11 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. -peek(State = #iv_state { len = 0 }) -> - {empty, State}; -peek(State = #iv_state { queue = Q}) -> - {value, {Msg, MsgProps, _IsDelivered}} = queue:peek(Q), - {{Msg, MsgProps}, State}. - dropwhile(_Pred, State = #iv_state { len = 0 }) -> State; dropwhile(Pred, State = #iv_state { queue = Q }) -> {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), - case Pred(Msg, MsgProps) of + case Pred(MsgProps) of true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 37b0916b..430a79d9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1843,8 +1843,7 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_dropwhile/1, - fun test_peek/1]], + fun test_dropwhile/1]], passed. test_dropwhile(VQ0) -> @@ -1862,7 +1861,7 @@ test_dropwhile(VQ0) -> %% drop the first 5 messages VQ2 = rabbit_variable_queue:dropwhile( - fun(_Msg, #msg_properties { expiry = Expiry }) -> + fun(#msg_properties { expiry = Expiry }) -> Expiry =< 5 end, VQ1), @@ -1878,32 +1877,6 @@ test_dropwhile(VQ0) -> VQ4. -test_peek(VQ0) -> - Expiry = 123, - Body = <<"test">>, - - %% publish message - VQ1 = rabbit_variable_queue:publish(rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{}, Body), - #msg_properties{ expiry = Expiry }, - VQ0), - - %% take a peek - {{#basic_message{ content = Content }, - #msg_properties { expiry = Expiry}}, VQ2} = - rabbit_variable_queue:peek(VQ1), - - {_, Body} = rabbit_basic:from_content(Content), - - %% should be able to fetch still - {{_Msg, _, _, _}, VQ3} = rabbit_variable_queue:fetch(false, VQ2), - - %% should be empty now - {empty, VQ4} = rabbit_variable_queue:peek(VQ3), - - VQ4. - test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bf1af596..7d584026 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, peek/1, + requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -517,53 +517,71 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, in_counter = InCount + 1, persistent_count = PCount1, pending_ack = PA1 })}. - -peek(State) -> - internal_queue_out( - fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, - _, State1) -> - {{Msg, MsgProps}, State1} - end, State). - dropwhile(Pred, State) -> case internal_queue_out( - fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, - Q4a, State1) -> - case Pred(Msg, MsgProps) of + fun(MsgStatus = #msg_status { msg_properties = MsgProps }, + State1) -> + case Pred(MsgProps) of true -> - {_, State2} = internal_fetch(false, Q4a, - MsgStatus, State1), + {_, State2} = internal_fetch(false, + MsgStatus, State1), dropwhile(Pred, State2); false -> - State1 + %% message needs to go back into Q4 (or + %% maybe go in for the first time if it was + %% loaded from Q3). Also the msg contents + %% might not be in RAM, so read them in now + {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = + read_msg(MsgStatus, State1), + State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)} end end, State) of - {empty, State2} -> State2; - State2 -> State2 + {empty, StateR} -> StateR; + StateR -> StateR end. fetch(AckRequired, State) -> internal_queue_out( - fun(MsgStatus, Q4a, State1) -> - internal_fetch(AckRequired, Q4a, MsgStatus, State1) + fun(MsgStatus, State1) -> + %% it's possible that the message wasn't read from disk + %% at this point, so read it in. + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + internal_fetch(AckRequired, MsgStatus1, State2) end, State). internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> - case fetch_from_q3_to_q4(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, State1} -> internal_queue_out(Fun, State1) + case fetch_from_q3(State) of + {empty, State1} = Result -> a(State1), Result; + {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1) end; - {{value, Value}, Q4a} -> - %% don't automatically overwrite the state with the popped - %% queue because some callbacks choose to rollback the pop - %% of the message from the queue - Fun(Value, Q4a, State) + {{value, MsgStatus}, Q4a} -> + Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. -internal_fetch(AckRequired, Q4a, +read_msg(MsgStatus = #msg_status { msg = undefined, + guid = Guid, + index_on_disk = IndexOnDisk, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + read_from_msg_store(MSCState, IsPersistent, Guid), + + RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), + true = RamIndexCount1 >= 0, %% ASSERTION + + {MsgStatus #msg_status { msg = Msg }, + State #vqstate { ram_msg_count = RamMsgCount + 1, + ram_index_count = RamIndexCount1, + msg_store_clients = MSCState1 }}; +read_msg(MsgStatus, State) -> + {MsgStatus, State}. + +internal_fetch(AckRequired, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -601,8 +619,7 @@ internal_fetch(AckRequired, Q4a, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { q4 = Q4a, - ram_msg_count = RamMsgCount - 1, + a(State #vqstate { ram_msg_count = RamMsgCount - 1, out_counter = OutCount + 1, index_state = IndexState2, len = Len1, @@ -1288,40 +1305,31 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). -fetch_from_q3_to_q4(State = #vqstate { +fetch_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, - q4 = Q4, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - msg_store_clients = MSCState }) -> + q4 = Q4 }) -> case bpqueue:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, IndexOnDisk, MsgStatus = #msg_status { - msg = undefined, guid = Guid, - is_persistent = IsPersistent }}, Q3a} -> - {{ok, Msg = #basic_message {}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), - Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4), - RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), - true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, - q4 = Q4a, - ram_msg_count = RamMsgCount + 1, - ram_index_count = RamIndexCount1, - msg_store_clients = MSCState1 }, + {{value, _IndexOnDisk, MsgStatus}, Q3a} -> + + State1 = State #vqstate { q3 = Q3a}, + State2 = case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% q3 is now empty, it wasn't before; delta is - %% still empty. So q2 must be empty, and q1 - %% can now be joined onto q4 + %% still empty. So q2 must be empty, and we + %% know q4 is empty otherwise we wouldn't be + %% loading from q3. As such, we can just set + %% q4 to Q1. true = bpqueue:is_empty(Q2), %% ASSERTION + true = queue:is_empty(Q4), %% ASSERTION State1 #vqstate { q1 = queue:new(), - q4 = queue:join(Q4a, Q1) }; + q4 = Q1 }; {true, false} -> maybe_deltas_to_betas(State1); {false, _} -> @@ -1330,7 +1338,7 @@ fetch_from_q3_to_q4(State = #vqstate { %% delta and q3 are maintained State1 end, - {loaded, State2} + {loaded, {MsgStatus, State2}} end. maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> |