summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-20 19:54:22 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-20 19:54:22 +0000
commit6bde32b8f6af20212b815d62e5aac051af14d62f (patch)
tree04753300ee00628088e80deed3272d2ab0b80b9b
parente7a061a08b7c91393beaf7d377b5efb8a9c633db (diff)
downloadrabbitmq-server-6bde32b8f6af20212b815d62e5aac051af14d62f.tar.gz
introduce is_empty(State) helper in rabbit_amqqueue_process
ported from bug 23749
-rw-r--r--src/rabbit_amqqueue_process.erl21
1 files changed, 10 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index be7ee097..4f0da702 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -347,9 +347,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(#q{active_consumers = AC,
- backing_queue = BQ, backing_queue_state = BQS}) ->
- true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
+assert_invariant(State = #q{active_consumers = AC}) ->
+ true = (queue:is_empty(AC) orelse is_empty(State)).
+
+is_empty({backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -470,9 +471,8 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {Result, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} =
- fetch(AckRequired, State),
- {Result, BQ:is_empty(BQS), State1}.
+ {Result, State1} = fetch(AckRequired, State),
+ {Result, is_empty(State1), State1}.
confirm_messages([], State) ->
State;
@@ -519,10 +519,10 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+run_message_queue(State) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State),
+ is_empty(State), State),
State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
@@ -721,9 +721,8 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_msgs(State = #q{backing_queue_state = BQS,
- backing_queue = BQ }) ->
- case BQ:is_empty(BQS) of
+drop_expired_msgs(State) ->
+ case is_empty(State) of
true -> State;
false -> drop_expired_msgs(now_micros(), State)
end.