diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-12 22:34:33 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-12 22:34:33 +0000 |
commit | dd65549abcab4554f46ff1f41057b81c3fa00812 (patch) | |
tree | db47df98c4565c9ed531c6d1a4f35ca440c2f26a | |
parent | 77ca2f1fa64f59f4e5c7bc67b214d2ca35cc2498 (diff) | |
download | rabbitmq-server-dd65549abcab4554f46ff1f41057b81c3fa00812.tar.gz |
drop expired messages post basic_getbug25448
...so messages with an expiry that are at the head of the queue after
a basic.get do not get stuck there in the absence of other queue
activity.
Rather than simply adding a call to drop_expired_messages/1 after the
call to fetch/1 in the basic_get code, we insert the call into
fetch/1, which allows us to remove it from the other call site. Thus
fetch/1 preserves the invariant we are after, namely that whenever a
queue has a message at the head with an expiry, there is a timer set
to drop said message.
Note that the message count returned by basic.get does not reflect the
dropping of expired messages after the fetched message. That's ok
since we make no guarantee that messages are expired straight
away. And note that on 'default' (rather than 'stable') the behaviour
is actually different; due to various other changes there we will in
fact return the reduced count.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 |
1 files changed, 4 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 49fcf070..e3885644 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -478,11 +478,10 @@ deliver_msg_to_consumer(DeliverFun, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {{Message, IsDelivered, AckTag, _Remaining}, State1} = + {{Message, IsDelivered, AckTag, _Remaining}, + State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} = fetch(AckRequired, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_messages(State1), - {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}. + {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State1}. confirm_messages([], State) -> State; @@ -579,7 +578,7 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ, fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, State#q{backing_queue_state = BQS1}}. + {Result, drop_expired_messages(State#q{backing_queue_state = BQS1})}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, |