summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-22 18:37:40 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-22 18:37:40 +0100
commitf8174a443dd985bbf0dac128b21c7d5d84a66499 (patch)
tree806472840bd0da73628b5da13ff74b592dd72c6f
parent85ce982ea685c47f6546957466f826549452dff0 (diff)
downloadrabbitmq-server-f8174a443dd985bbf0dac128b21c7d5d84a66499.tar.gz
reworked how message filtering works with dropwhile
-rw-r--r--include/rabbit_backing_queue_spec.hrl6
-rw-r--r--src/rabbit_amqqueue_process.erl36
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_invariable_queue.erl7
-rw-r--r--src/rabbit_persister.erl2
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl132
7 files changed, 116 insertions, 79 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index f417c6d9..3e78d571 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -30,9 +30,8 @@
%%
-type(fetch_result() ::
- %% Message, MessageProperties, IsDelivered, AckTag, Remaining_Len
+ %% Message, IsDelivered, AckTag, Remaining_Len
('empty'|{rabbit_types:basic_message(),
- rabbit_types:msg_properties(),
boolean(), ack(),
non_neg_integer()})).
-type(is_durable() :: boolean()).
@@ -54,6 +53,9 @@
-spec(publish_delivered/4 ::
(ack_required(), rabbit_types:basic_message(),
rabbit_types:msg_properties(), state()) -> {ack(), state()}).
+-spec(dropwhile/2 ::
+ (fun ((rabbit_types:basic_message(), rabbit_types:msg_properties())
+ -> boolean()), state()) -> state()).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/4 ::
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1aa1d05f..d92dd586 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -406,9 +406,10 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
+ #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State),
IsEmpty = BQ:is_empty(BQS),
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
@@ -451,17 +452,24 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}};
- {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} ->
- case msg_expired(MsgProperties) of
- true ->
- fetch(AckRequired, State#q{backing_queue_state = BQS1});
- false ->
- {{Message, IsDelivered, AckTag, Remaining},
- State#q{backing_queue_state = BQS1}}
- end
+ {empty, BQS1} ->
+ {empty, State#q{backing_queue_state = BQS1}};
+ {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
+ {{Message, IsDelivered, AckTag, Remaining},
+ State#q{backing_queue_state = BQS1}}
end.
+drop_expired_messages(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ BQS1 = BQ:dropwhile(
+ fun (_Msg, _MsgProperties = #msg_properties{expiry = undefined}) ->
+ false;
+ (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) ->
+ Now = timer:now_diff(os:timestamp(), {0,0,0}),
+ Now > Expiry
+ end, BQS),
+ State #q{backing_queue_state = BQS1}.
+
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
remove_consumer(ChPid, ConsumerTag, Queue) ->
@@ -579,12 +587,6 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
-msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) ->
- false;
-msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) ->
- Now = timer:now_diff(now(), {0,0,0}),
- Now > Expiry.
-
reset_msg_expiry_fun(State) ->
fun(MsgProps) ->
MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
@@ -748,7 +750,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, State1) of
+ case fetch(AckRequired, drop_expired_messages(State1)) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 5cb78368..4f71c1a8 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -69,6 +69,10 @@ behaviour_info(callbacks) ->
%% (i.e. saves the round trip through the backing queue).
{publish_delivered, 4},
+ %% Drop messages in the queue while the supplied predicate
+ %% returns true and return the new state.
+ {dropwhile, 2},
+
%% Produce the next message.
{fetch, 2},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 152d2a87..4626b513 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -32,8 +32,8 @@
-module(rabbit_invariable_queue).
-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
- publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
- tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
+ publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
+ dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
@@ -118,6 +118,9 @@ publish_delivered(true, Msg = #basic_message { guid = Guid },
ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
+dropwhile(Pred, State) ->
+ State.
+
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
fetch(AckRequired, State = #iv_state { len = Len,
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index dad81873..6c501fc0 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -69,7 +69,7 @@
-type(pmsg() :: {rabbit_amqqueue:name(), pkey()}).
-type(work_item() ::
- {publish, rabbit_types:message(), pmsg()} |
+ {publish, rabbit_types:message(), rabbit_types:msg_properties(), pmsg()} |
{deliver, pmsg()} |
{ack, pmsg()}).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 08ae0d6c..ee2b564d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1815,7 +1815,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- _Props, IsDelivered, AckTagN, Rem}, VQM} =
+ IsDelivered, AckTagN, Rem}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -1878,7 +1878,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} =
+ {{_Msg, false, AckTag, Len}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
@@ -1943,7 +1943,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
- {{_Msg1, _Props, true, _AckTag1, Count1}, VQ8} =
+ {{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
@@ -1989,7 +1989,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true),
- {{_Msg1, _Props, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
rabbit_amqqueue:internal_delete(QName)
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 202f2c99..4df4088c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -34,7 +34,7 @@
-export([init/3, terminate/1, delete_and_terminate/1,
purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1,
+ requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1]).
@@ -518,64 +518,90 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
persistent_count = PCount1,
pending_ack = PA1 })}.
-fetch(AckRequired, State = #vqstate { q4 = Q4,
- ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- len = Len,
- persistent_count = PCount,
- pending_ack = PA }) ->
+
+dropwhile(Pred, State) ->
+ case internal_queue_out(
+ fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps },
+ Q4a, State1) ->
+ case Pred(Msg, MsgProps) of
+ true ->
+ {_, State2} = internal_fetch(false, Q4a,
+ MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false ->
+ State1
+ end
+ end, State) of
+ {empty, State2} -> State2;
+ State2 -> State2
+ end.
+
+fetch(AckRequired, State) ->
+ internal_queue_out(
+ fun(MsgStatus, Q4a, State1) ->
+ internal_fetch(AckRequired, Q4a, MsgStatus, State1)
+ end, State).
+
+internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
case queue:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3_to_q4(State) of
- {empty, State1} = Result -> a(State1), Result;
- {loaded, State1} -> fetch(AckRequired, State1)
+ {empty, State1} = Result -> a(State1), Result;
+ {loaded, State1} -> internal_queue_out(Fun, State1)
end;
- {{value, MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId,
- is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk,
- msg_properties = MsgProperties }},
- Q4a} ->
-
- %% 1. Mark it delivered if necessary
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
-
- %% 2. Remove from msg_store and queue index, if necessary
- MsgStore = find_msg_store(IsPersistent),
- Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end,
- Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
- {false, true, false, _} -> Rem(), IndexState1;
- {false, true, true, _} -> Rem(), Ack();
- { true, true, true, false} -> Ack();
- _ -> IndexState1
- end,
-
- %% 3. If an ack is required, add something sensible to PA
- {AckTag, PA1} = case AckRequired of
- true -> PA2 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, PA),
- {SeqId, PA2};
- false -> {blank_ack, PA}
- end,
-
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
- {{Msg, MsgProperties, IsDelivered, AckTag, Len1},
- a(State #vqstate { q4 = Q4a,
- ram_msg_count = RamMsgCount - 1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1,
- pending_ack = PA1 })}
+ {{value, Value}, Q4a} ->
+ %% don't automatically overwrite the state with the popped
+ %% queue because some callbacks choose to rollback the pop
+ %% of the message from the queue
+ Fun(Value, Q4a, State)
end.
+internal_fetch(AckRequired, Q4a,
+ MsgStatus = #msg_status {
+ msg = Msg, guid = Guid, seq_id = SeqId,
+ is_persistent = IsPersistent, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
+ State = #vqstate {
+ ram_msg_count = RamMsgCount, out_counter = OutCount,
+ index_state = IndexState, len = Len, persistent_count = PCount,
+ pending_ack = PA }) ->
+ %% 1. Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ %% 2. Remove from msg_store and queue index, if necessary
+ MsgStore = find_msg_store(IsPersistent),
+ Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end,
+ Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
+ IndexState2 =
+ case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
+ {false, true, false, _} -> Rem(), IndexState1;
+ {false, true, true, _} -> Rem(), Ack();
+ { true, true, true, false} -> Ack();
+ _ -> IndexState1
+ end,
+
+ %% 3. If an ack is required, add something sensible to PA
+ {AckTag, PA1} = case AckRequired of
+ true -> PA2 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, PA),
+ {SeqId, PA2};
+ false -> {blank_ack, PA}
+ end,
+
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
+ Len1 = Len - 1,
+ {{Msg, IsDelivered, AckTag, Len1},
+ a(State #vqstate { q4 = Q4a,
+ ram_msg_count = RamMsgCount - 1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1,
+ pending_ack = PA1 })}.
+
ack(AckTags, State) ->
a(ack(fun rabbit_msg_store:remove/2,
fun (_AckEntry, State1) -> State1 end,