diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-02-20 17:20:02 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-02-20 17:20:02 +0000 |
commit | 21967553cfe335810ee6bf922e1b71ee8acfaa16 (patch) | |
tree | 2388771003111c36b683b984bbc21d8a21481067 | |
parent | 4bc0fdc948a9221d3136d306fc3beb4c04aa6575 (diff) | |
download | rabbitmq-server-21967553cfe335810ee6bf922e1b71ee8acfaa16.tar.gz |
Be more careful about where we send_drained from.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 |
1 files changed, 25 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8ba9b4d2..5c376681 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -407,6 +407,12 @@ maybe_send_drained(#q{backing_queue = BQ, backing_queue_state = BQS}) -> false -> ok end. +maybe_send_drained(C, #q{backing_queue = BQ, backing_queue_state = BQS}) -> + case BQ:is_empty(BQS) of + true -> send_drained(C); + false -> ok + end. + send_drained() -> [send_drained(C) || C <- all_ch_record()]. send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> @@ -437,7 +443,6 @@ is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. deliver_msgs_to_consumers(_DeliverFun, true, State) -> - send_drained(), {true, State}; deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> @@ -603,12 +608,16 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})). + State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), + maybe_send_drained(State1), + run_message_queue(State1). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}. + State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), + maybe_send_drained(State1), + {Result, State1}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -752,6 +761,11 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. +%% Logically this function should invoke maybe_send_drained/1. However, that's +%% expensive, and some frequent callers of drop_expired_msgs/1 (in particular +%% deliver_or_enqueue/3) cannot possibly cause the queue to become empty, so +%% instead we push the responsibility to the call sites. So be cautious when +%% adding new ones. drop_expired_msgs(State = #q{backing_queue_state = BQS, backing_queue = BQ }) -> case BQ:is_empty(BQS) of @@ -1154,7 +1168,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, AC1 = queue:in(E, State1#q.active_consumers), run_message_queue(State1#q{active_consumers = AC1}) end, - maybe_send_drained(State2), + maybe_send_drained(C1, State), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State2)), reply(ok, State2) @@ -1204,7 +1218,9 @@ handle_call({delete, IfUnused, IfEmpty}, From, handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State#q{backing_queue_state = BQS1}); + State1 = State#q{backing_queue_state = BQS1}, + maybe_send_drained(State1), + reply({ok, Count}, State1); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1367,7 +1383,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)), State1 = possibly_unblock( State, ChPid, fun(C) -> C#cr{limiter = Lim2} end), - maybe_send_drained(State1), + maybe_send_drained(lookup_ch(ChPid), State), noreply(State1); handle_cast(wake_up, State) -> @@ -1389,7 +1405,9 @@ handle_info(maybe_expire, State) -> end; handle_info(drop_expired, State) -> - noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined})); + State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), + maybe_send_drained(State1), + noreply(State1); handle_info(emit_stats, State) -> emit_stats(State), |