diff options
author | Rob Harrop <rharrop@vmware.com> | 2010-09-22 18:37:40 +0100 |
---|---|---|
committer | Rob Harrop <rharrop@vmware.com> | 2010-09-22 18:37:40 +0100 |
commit | f8174a443dd985bbf0dac128b21c7d5d84a66499 (patch) | |
tree | 806472840bd0da73628b5da13ff74b592dd72c6f | |
parent | 85ce982ea685c47f6546957466f826549452dff0 (diff) | |
download | rabbitmq-server-f8174a443dd985bbf0dac128b21c7d5d84a66499.tar.gz |
reworked how message filtering works with dropwhile
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_invariable_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 132 |
7 files changed, 116 insertions, 79 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index f417c6d9..3e78d571 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -30,9 +30,8 @@ %% -type(fetch_result() :: - %% Message, MessageProperties, IsDelivered, AckTag, Remaining_Len + %% Message, IsDelivered, AckTag, Remaining_Len ('empty'|{rabbit_types:basic_message(), - rabbit_types:msg_properties(), boolean(), ack(), non_neg_integer()})). -type(is_durable() :: boolean()). @@ -54,6 +53,9 @@ -spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), rabbit_types:msg_properties(), state()) -> {ack(), state()}). +-spec(dropwhile/2 :: + (fun ((rabbit_types:basic_message(), rabbit_types:msg_properties()) + -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1aa1d05f..d92dd586 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -406,9 +406,10 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, + #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. @@ -451,17 +452,24 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; - {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> - case msg_expired(MsgProperties) of - true -> - fetch(AckRequired, State#q{backing_queue_state = BQS1}); - false -> - {{Message, IsDelivered, AckTag, Remaining}, - State#q{backing_queue_state = BQS1}} - end + {empty, BQS1} -> + {empty, State#q{backing_queue_state = BQS1}}; + {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + {{Message, IsDelivered, AckTag, Remaining}, + State#q{backing_queue_state = BQS1}} end. +drop_expired_messages(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + BQS1 = BQ:dropwhile( + fun (_Msg, _MsgProperties = #msg_properties{expiry = undefined}) -> + false; + (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> + Now = timer:now_diff(os:timestamp(), {0,0,0}), + Now > Expiry + end, BQS), + State #q{backing_queue_state = BQS1}. + add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> @@ -579,12 +587,6 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). -msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) -> - false; -msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) -> - Now = timer:now_diff(now(), {0,0,0}), - Now > Expiry. - reset_msg_expiry_fun(State) -> fun(MsgProps) -> MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} @@ -748,7 +750,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case fetch(AckRequired, State1) of + case fetch(AckRequired, drop_expired_messages(State1)) of {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 5cb78368..4f71c1a8 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -69,6 +69,10 @@ behaviour_info(callbacks) -> %% (i.e. saves the round trip through the backing queue). {publish_delivered, 4}, + %% Drop messages in the queue while the supplied predicate + %% returns true and return the new state. + {dropwhile, 2}, + %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 152d2a87..4626b513 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -32,8 +32,8 @@ -module(rabbit_invariable_queue). -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, - tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, + publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, + dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -118,6 +118,9 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. +dropwhile(Pred, State) -> + State. + fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; fetch(AckRequired, State = #iv_state { len = Len, diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index dad81873..6c501fc0 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -69,7 +69,7 @@ -type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). -type(work_item() :: - {publish, rabbit_types:message(), pmsg()} | + {publish, rabbit_types:message(), rabbit_types:msg_properties(), pmsg()} | {deliver, pmsg()} | {ack, pmsg()}). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 08ae0d6c..ee2b564d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1815,7 +1815,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - _Props, IsDelivered, AckTagN, Rem}, VQM} = + IsDelivered, AckTagN, Rem}, VQM} = rabbit_variable_queue:fetch(true, VQN), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -1878,7 +1878,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = + {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). @@ -1943,7 +1943,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), - {{_Msg1, _Props, true, _AckTag1, Count1}, VQ8} = + {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), @@ -1989,7 +1989,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true), - {{_Msg1, _Props, true, _AckTag1, CountMinusOne}, VQ2} = + {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 202f2c99..4df4088c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, + requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -518,64 +518,90 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, persistent_count = PCount1, pending_ack = PA1 })}. -fetch(AckRequired, State = #vqstate { q4 = Q4, - ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + +dropwhile(Pred, State) -> + case internal_queue_out( + fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, + Q4a, State1) -> + case Pred(Msg, MsgProps) of + true -> + {_, State2} = internal_fetch(false, Q4a, + MsgStatus, State1), + dropwhile(Pred, State2); + false -> + State1 + end + end, State) of + {empty, State2} -> State2; + State2 -> State2 + end. + +fetch(AckRequired, State) -> + internal_queue_out( + fun(MsgStatus, Q4a, State1) -> + internal_fetch(AckRequired, Q4a, MsgStatus, State1) + end, State). + +internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3_to_q4(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, State1} -> fetch(AckRequired, State1) + {empty, State1} = Result -> a(State1), Result; + {loaded, State1} -> internal_queue_out(Fun, State1) end; - {{value, MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, - msg_properties = MsgProperties }}, - Q4a} -> - - %% 1. Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% 2. Remove from msg_store and queue index, if necessary - MsgStore = find_msg_store(IsPersistent), - Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, - Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 - end, - - %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} - end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, - {{Msg, MsgProperties, IsDelivered, AckTag, Len1}, - a(State #vqstate { q4 = Q4a, - ram_msg_count = RamMsgCount - 1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })} + {{value, Value}, Q4a} -> + %% don't automatically overwrite the state with the popped + %% queue because some callbacks choose to rollback the pop + %% of the message from the queue + Fun(Value, Q4a, State) end. +internal_fetch(AckRequired, Q4a, + MsgStatus = #msg_status { + msg = Msg, guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, + State = #vqstate { + ram_msg_count = RamMsgCount, out_counter = OutCount, + index_state = IndexState, len = Len, persistent_count = PCount, + pending_ack = PA }) -> + %% 1. Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + %% 2. Remove from msg_store and queue index, if necessary + MsgStore = find_msg_store(IsPersistent), + Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, + IndexState2 = + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of + {false, true, false, _} -> Rem(), IndexState1; + {false, true, true, _} -> Rem(), Ack(); + { true, true, true, false} -> Ack(); + _ -> IndexState1 + end, + + %% 3. If an ack is required, add something sensible to PA + {AckTag, PA1} = case AckRequired of + true -> PA2 = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, PA), + {SeqId, PA2}; + false -> {blank_ack, PA} + end, + + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + Len1 = Len - 1, + {{Msg, IsDelivered, AckTag, Len1}, + a(State #vqstate { q4 = Q4a, + ram_msg_count = RamMsgCount - 1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1, + pending_ack = PA1 })}. + ack(AckTags, State) -> a(ack(fun rabbit_msg_store:remove/2, fun (_AckEntry, State1) -> State1 end, |