summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-03-23 22:21:15 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-03-23 22:21:15 +0000
commit81e7d8500bd6de9d67e6db984ed1c2d9b683814a (patch)
tree65d291b4e676ca464e86e2c8f9c8298b4821c9a1
parentbe8b476bbe52e6271cf29a2c13f08316c357852f (diff)
downloadrabbitmq-server-bug24827.tar.gz
ensure appropriate expiry of immediately delivered messagesbug24827
-rw-r--r--src/rabbit_amqqueue_process.erl24
1 files changed, 8 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b829f461..83d9ae22 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -24,9 +24,6 @@
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--define(BASE_MESSAGE_PROPERTIES,
- #message_properties{expiry = undefined, needs_confirming = false}).
-
-export([start_link/1, info_keys/0]).
-export([init_with_backing_queue_state/7]).
@@ -530,15 +527,10 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
{false, BQS1} ->
DeliverFun =
fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- %% we don't need an expiry here because
- %% messages are not being enqueued, so we use
- %% an empty message_properties.
- {AckTag, BQS3} =
- BQ:publish_delivered(
- AckRequired, Message,
- (?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = needs_confirming(Confirm)},
- SenderPid, BQS2),
+ Props = message_properties(Confirm, State1),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ AckRequired, Message, Props,
+ SenderPid, BQS2),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS3}}
end,
@@ -565,8 +557,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_record_confirm_message(Confirm, State1),
case Delivered of
true -> State2;
- false -> Props = (message_properties(State)) #message_properties{
- needs_confirming = needs_confirming(Confirm)},
+ false -> Props = message_properties(Confirm, State),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
@@ -694,8 +685,9 @@ discard_delivery(#delivery{sender = SenderPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-message_properties(#q{ttl=TTL}) ->
- #message_properties{expiry = calculate_msg_expiry(TTL)}.
+message_properties(Confirm, #q{ttl = TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(TTL),
+ needs_confirming = needs_confirming(Confirm)}.
calculate_msg_expiry(undefined) -> undefined;
calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).