diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-20 19:41:11 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-20 19:41:11 +0000 |
commit | adf8c37c62b490fb42db906a318fe957ac39d299 (patch) | |
tree | e202d348f6880314fb7cf8e622f3df258343177f | |
parent | cf56799e48ed7dedbf0060f217b96d50c2868ddc (diff) | |
download | rabbitmq-server-adf8c37c62b490fb42db906a318fe957ac39d299.tar.gz |
introduce is_empty(State) helper
and in the resulting refactor also remove a non-linear BQS access in
handle_info/drop_expired.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 |
1 files changed, 17 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9fd85bd6..fb60a043 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -353,9 +353,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(#q{active_consumers = AC, - backing_queue = BQ, backing_queue_state = BQS}) -> - true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). +assert_invariant(State = #q{active_consumers = AC}) -> + true = (queue:is_empty(AC) orelse is_empty(State)). + +is_empty({backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -401,16 +402,14 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. -maybe_send_drained(true, _State) -> - ok; -maybe_send_drained(false, #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case BQ:is_empty(BQS) of +maybe_send_drained(WasEmpty, State) -> + case WasEmpty andalso is_empty(State) of true -> [send_drained(C) || C <- all_ch_record()]; false -> ok end. -maybe_send_drained_cons(C, #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case BQ:is_empty(BQS) of +maybe_send_drained_cons(C, State) -> + case is_empty(State) of true -> send_drained(C); false -> ok end. @@ -500,9 +499,8 @@ deliver_msg_to_consumer(DeliverFun, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {Result, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} = - fetch(AckRequired, State), - {Result, BQ:is_empty(BQS), State1}. + {Result, State1} = fetch(AckRequired, State), + {Result, is_empty(State1), State1}. confirm_messages([], State) -> State; @@ -549,10 +547,10 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +run_message_queue(State) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, - BQ:is_empty(BQS), State), + is_empty(State), State), State1. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, @@ -766,9 +764,8 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> %% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot %% possibly cause the queue to become empty, we push the %% responsibility to the callers. So be cautious when adding new ones. -drop_expired_msgs(State = #q{backing_queue_state = BQS, - backing_queue = BQ }) -> - case BQ:is_empty(BQS) of +drop_expired_msgs(State) -> + case is_empty(State) of true -> State; false -> drop_expired_msgs(now_micros(), State) end. @@ -1404,10 +1401,10 @@ handle_info(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; -handle_info(drop_expired, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +handle_info(drop_expired, State) -> + WasEmpty = is_empty(State), State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), - maybe_send_drained(BQ:is_empty(BQS), State1), + maybe_send_drained(WasEmpty, State1), noreply(State1); handle_info(emit_stats, State) -> |