diff options
author | Emile Joubert <emile@rabbitmq.com> | 2010-06-11 12:43:56 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2010-06-11 12:43:56 +0100 |
commit | 794d53c6b884aa0de80d2a910b79992071fbe0bf (patch) | |
tree | abfeb0242157cbfc16e8af1bb5239adaa2836fc2 | |
parent | 10843dcf5a3c0506e89b916b5c29812d66ed923d (diff) | |
parent | bf981b28c61d00afc9a9a68f02c1d1212a547196 (diff) | |
download | rabbitmq-server-794d53c6b884aa0de80d2a910b79992071fbe0bf.tar.gz |
Merge heads
-rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 |
2 files changed, 14 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 76488255..3c9c41bd 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -268,7 +268,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate_cast(QPid, {requeue, MsgIds, ChPid}). + delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). @@ -398,9 +398,6 @@ delegate_pcall(Pid, Pri, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). -delegate_cast(Pid, Msg) -> - delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). - delegate_pcast(Pid, Pri, Msg) -> delegate:invoke_no_result(Pid, fun (P) -> gen_server2:pcast(P, Pri, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3283cb66..5fdf0ffa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -716,6 +716,19 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); +handle_call({requeue, AckTags, ChPid}, From, State) -> + gen_server2:reply(From, ok), + case lookup_ch(ChPid) of + not_found -> + rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", + [ChPid]), + noreply(State); + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(requeue_and_run(AckTags, State)) + end; + handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). @@ -743,18 +756,6 @@ handle_cast({ack, Txn, AckTags, ChPid}, handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); -handle_cast({requeue, AckTags, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> - rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", - [ChPid]), - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(requeue_and_run(AckTags, State)) - end; - handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, |