summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-20 19:41:11 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-20 19:41:11 +0000
commitadf8c37c62b490fb42db906a318fe957ac39d299 (patch)
treee202d348f6880314fb7cf8e622f3df258343177f
parentcf56799e48ed7dedbf0060f217b96d50c2868ddc (diff)
downloadrabbitmq-server-adf8c37c62b490fb42db906a318fe957ac39d299.tar.gz
introduce is_empty(State) helper
and in the resulting refactor also remove a non-linear BQS access in handle_info/drop_expired.
-rw-r--r--src/rabbit_amqqueue_process.erl37
1 files changed, 17 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9fd85bd6..fb60a043 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -353,9 +353,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(#q{active_consumers = AC,
- backing_queue = BQ, backing_queue_state = BQS}) ->
- true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
+assert_invariant(State = #q{active_consumers = AC}) ->
+ true = (queue:is_empty(AC) orelse is_empty(State)).
+
+is_empty({backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -401,16 +402,14 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-maybe_send_drained(true, _State) ->
- ok;
-maybe_send_drained(false, #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- case BQ:is_empty(BQS) of
+maybe_send_drained(WasEmpty, State) ->
+ case WasEmpty andalso is_empty(State) of
true -> [send_drained(C) || C <- all_ch_record()];
false -> ok
end.
-maybe_send_drained_cons(C, #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- case BQ:is_empty(BQS) of
+maybe_send_drained_cons(C, State) ->
+ case is_empty(State) of
true -> send_drained(C);
false -> ok
end.
@@ -500,9 +499,8 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {Result, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} =
- fetch(AckRequired, State),
- {Result, BQ:is_empty(BQS), State1}.
+ {Result, State1} = fetch(AckRequired, State),
+ {Result, is_empty(State1), State1}.
confirm_messages([], State) ->
State;
@@ -549,10 +547,10 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+run_message_queue(State) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State),
+ is_empty(State), State),
State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
@@ -766,9 +764,8 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot
%% possibly cause the queue to become empty, we push the
%% responsibility to the callers. So be cautious when adding new ones.
-drop_expired_msgs(State = #q{backing_queue_state = BQS,
- backing_queue = BQ }) ->
- case BQ:is_empty(BQS) of
+drop_expired_msgs(State) ->
+ case is_empty(State) of
true -> State;
false -> drop_expired_msgs(now_micros(), State)
end.
@@ -1404,10 +1401,10 @@ handle_info(maybe_expire, State) ->
false -> noreply(ensure_expiry_timer(State))
end;
-handle_info(drop_expired, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+handle_info(drop_expired, State) ->
+ WasEmpty = is_empty(State),
State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
- maybe_send_drained(BQ:is_empty(BQS), State1),
+ maybe_send_drained(WasEmpty, State1),
noreply(State1);
handle_info(emit_stats, State) ->