diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-10-20 16:22:35 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-10-20 16:22:35 +0100 |
commit | aade042ed7a485c21481bee2ffaf7652652711a2 (patch) | |
tree | 85a60f29187e5b02d49ee48753abf5e14f2c0751 | |
parent | 54e0b5f85c133dc164a4c927c42a278aabd7dbfa (diff) | |
download | rabbitmq-server-aade042ed7a485c21481bee2ffaf7652652711a2.tar.gz |
Refrain from resetting message expiry in message requeue
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 7 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 47 |
7 files changed, 36 insertions, 50 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 20fe4234..4a657951 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -22,9 +22,6 @@ -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(confirm_required() :: boolean()). --type(message_properties_transformer() :: - fun ((rabbit_types:message_properties()) - -> rabbit_types:message_properties())). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). @@ -51,7 +48,7 @@ -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). --spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) +-spec(requeue/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 46f6674b..62ccbcb0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -554,9 +554,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> run_backing_queue( BQ, fun (M, BQS) -> - {_MsgIds, BQS1} = - M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), - BQS1 + {_MsgIds, BQS1} = M:requeue(AckTags, BQS), BQS1 end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, @@ -670,11 +668,6 @@ discard_delivery(#delivery{sender = ChPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. -reset_msg_expiry_fun(TTL) -> - fun(MsgProps) -> - MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} - end. - message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 77278416..c3b322ee 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -107,7 +107,7 @@ behaviour_info(callbacks) -> %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. - {requeue, 3}, + {requeue, 2}, %% How long is my queue? {len, 1}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 328fe639..f60562ef 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, + requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3]). @@ -248,11 +248,11 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -requeue(AckTags, MsgPropsFun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> - {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), - ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}), +requeue(AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f423760a..7182042d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -827,14 +827,14 @@ process_instruction({ack, MsgIds}, [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; -process_instruction({requeue, MsgPropsFun, MsgIds}, +process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {ok, case length(AckTags) =:= length(MsgIds) of true -> - {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }; false -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3a4f6f84..fcfd5557 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2238,11 +2238,10 @@ test_variable_queue_requeue(VQ0) -> (_, Acc) -> Acc end, [], lists:zip(Acks, Seq)), - {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, - fun(X) -> X end, VQ3), + {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3), VQ5 = lists:foldl(fun (AckTag, VQN) -> {_MsgId, VQM} = rabbit_variable_queue:requeue( - [AckTag], fun(X) -> X end, VQN), + [AckTag], VQN), VQM end, VQ4, Subset), VQ6 = lists:foldl(fun (AckTag, VQa) -> @@ -2426,7 +2425,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), {_Guids, VQ4} = - rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), + rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 895fc388..1de7be32 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1, + dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, @@ -628,21 +628,19 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> +requeue(AckTags, #vqstate { delta = Delta, + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), - fun publish_alpha/2, - MsgPropsFun, State), + fun publish_alpha/2, State), {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, delta_limit(Delta), - fun publish_beta/2, - MsgPropsFun, State1), + fun publish_beta/2, State1), {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, - MsgPropsFun, State2), + State2), MsgCount = length(MsgIds2), {MsgIds2, a(reduce_memory_use( State3 #vqstate { delta = Delta1, @@ -1335,50 +1333,49 @@ publish_beta(MsgStatus, State) -> ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}. %% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) -> +queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, - Limit, PubFun, MsgPropsFun, State). + Limit, PubFun, State). queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, - Limit, PubFun, MsgPropsFun, State) + Limit, PubFun, State) when Limit == undefined orelse SeqId < Limit -> case ?QUEUE:out(Q) of {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} when SeqIdQ < SeqId -> %% enqueue from the remaining queue queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, - Limit, PubFun, MsgPropsFun, State); + Limit, PubFun, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids - {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun, - State), + {MsgStatus, State1} = msg_from_pending_ack(SeqId, State), {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, MsgPropsFun, State2) + Limit, PubFun, State2) end; queue_merge(SeqIds, Q, Front, MsgIds, - _Limit, _PubFun, _MsgPropsFun, State) -> + _Limit, _PubFun, State) -> {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. -delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> +delta_merge([], Delta, MsgIds, State) -> {Delta, MsgIds, State}; -delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) -> +delta_merge(SeqIds, Delta, MsgIds, State) -> lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> {#msg_status { msg_id = MsgId } = MsgStatus, State1} = - msg_from_pending_ack(SeqId, MsgPropsFun, State0), + msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 -msg_from_pending_ack(SeqId, MsgPropsFun, State) -> +msg_from_pending_ack(SeqId, State) -> {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = remove_pending_ack(SeqId, State), {MsgStatus #msg_status { - msg_props = (MsgPropsFun(MsgProps)) #message_properties { - needs_confirming = false } }, State1}. + msg_props = MsgProps #message_properties { needs_confirming = false } }, + State1}. beta_limit(Q) -> case ?QUEUE:peek(Q) of |