summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-03 02:00:12 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-03 02:00:12 +0000
commit7ff060c8f3ceecb05c577fbfb2bac1442ec10f63 (patch)
tree4a789973e7a4a10b06d496409ecf53545597812e /src/rabbit_amqqueue_process.erl
parent690dc6defcabe23efde8347b8fa8eb59097a1582 (diff)
downloadrabbitmq-server-7ff060c8f3ceecb05c577fbfb2bac1442ec10f63.tar.gz
Ensure all call sites of BQ:idle_timeout subsequently call run_message_queue to avoid violating queue invariantbug23793
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl15
1 files changed, 8 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3418c663..7c7e28fe 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -617,6 +617,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
+backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
+ maybe_run_queue_via_backing_queue(
+ fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
{Guids, BQS1} = Fun(BQS),
run_message_queue(
@@ -996,10 +1000,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
noreply(maybe_run_queue_via_backing_queue(Fun, State));
-handle_cast(sync_timeout, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS),
- sync_timer_ref = undefined});
+handle_cast(sync_timeout, State) ->
+ noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -1133,9 +1135,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
-handle_info(timeout, State = #q{backing_queue = BQ}) ->
- noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State));
+handle_info(timeout, State) ->
+ noreply(backing_queue_idle_timeout(State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};