From 81e7d8500bd6de9d67e6db984ed1c2d9b683814a Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 23 Mar 2012 22:21:15 +0000 Subject: ensure appropriate expiry of immediately delivered messages --- src/rabbit_amqqueue_process.erl | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b829f461..83d9ae22 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -24,9 +24,6 @@ -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(BASE_MESSAGE_PROPERTIES, - #message_properties{expiry = undefined, needs_confirming = false}). - -export([start_link/1, info_keys/0]). -export([init_with_backing_queue_state/7]). @@ -530,15 +527,10 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, {false, BQS1} -> DeliverFun = fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - %% we don't need an expiry here because - %% messages are not being enqueued, so we use - %% an empty message_properties. - {AckTag, BQS3} = - BQ:publish_delivered( - AckRequired, Message, - (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = needs_confirming(Confirm)}, - SenderPid, BQS2), + Props = message_properties(Confirm, State1), + {AckTag, BQS3} = BQ:publish_delivered( + AckRequired, Message, Props, + SenderPid, BQS2), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS3}} end, @@ -565,8 +557,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), case Delivered of true -> State2; - false -> Props = (message_properties(State)) #message_properties{ - needs_confirming = needs_confirming(Confirm)}, + false -> Props = message_properties(Confirm, State), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. @@ -694,8 +685,9 @@ discard_delivery(#delivery{sender = SenderPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. -message_properties(#q{ttl=TTL}) -> - #message_properties{expiry = calculate_msg_expiry(TTL)}. +message_properties(Confirm, #q{ttl = TTL}) -> + #message_properties{expiry = calculate_msg_expiry(TTL), + needs_confirming = needs_confirming(Confirm)}. calculate_msg_expiry(undefined) -> undefined; calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). -- cgit v1.2.1