summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-12 22:34:33 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-12 22:34:33 +0000
commitdd65549abcab4554f46ff1f41057b81c3fa00812 (patch)
treedb47df98c4565c9ed531c6d1a4f35ca440c2f26a
parent77ca2f1fa64f59f4e5c7bc67b214d2ca35cc2498 (diff)
downloadrabbitmq-server-dd65549abcab4554f46ff1f41057b81c3fa00812.tar.gz
drop expired messages post basic_getbug25448
...so messages with an expiry that are at the head of the queue after a basic.get do not get stuck there in the absence of other queue activity. Rather than simply adding a call to drop_expired_messages/1 after the call to fetch/1 in the basic_get code, we insert the call into fetch/1, which allows us to remove it from the other call site. Thus fetch/1 preserves the invariant we are after, namely that whenever a queue has a message at the head with an expiry, there is a timer set to drop said message. Note that the message count returned by basic.get does not reflect the dropping of expired messages after the fetched message. That's ok since we make no guarantee that messages are expired straight away. And note that on 'default' (rather than 'stable') the behaviour is actually different; due to various other changes there we will in fact return the reduced count.
-rw-r--r--src/rabbit_amqqueue_process.erl9
1 files changed, 4 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 49fcf070..e3885644 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -478,11 +478,10 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, _Remaining}, State1} =
+ {{Message, IsDelivered, AckTag, _Remaining},
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} =
fetch(AckRequired, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State1),
- {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}.
+ {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State1}.
confirm_messages([], State) ->
State;
@@ -579,7 +578,7 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ,
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, State#q{backing_queue_state = BQS1}}.
+ {Result, drop_expired_messages(State#q{backing_queue_state = BQS1})}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,