summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl25
2 files changed, 14 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 76488255..3c9c41bd 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -268,7 +268,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
@@ -398,9 +398,6 @@ delegate_pcall(Pid, Pri, Msg, Timeout) ->
delegate:invoke(Pid,
fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
-delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
-
delegate_pcast(Pid, Pri, Msg) ->
delegate:invoke_no_result(Pid,
fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3283cb66..5fdf0ffa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -716,6 +716,19 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
+handle_call({requeue, AckTags, ChPid}, From, State) ->
+ gen_server2:reply(From, ok),
+ case lookup_ch(ChPid) of
+ not_found ->
+ rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
+ [ChPid]),
+ noreply(State);
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(requeue_and_run(AckTags, State))
+ end;
+
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
@@ -743,18 +756,6 @@ handle_cast({ack, Txn, AckTags, ChPid},
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
-handle_cast({requeue, AckTags, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found ->
- rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
- [ChPid]),
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(requeue_and_run(AckTags, State))
- end;
-
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,