diff options
author | Michael Bridgen <mikeb@rabbitmq.com> | 2010-08-02 00:13:54 +0100 |
---|---|---|
committer | Michael Bridgen <mikeb@rabbitmq.com> | 2010-08-02 00:13:54 +0100 |
commit | de2899d8d01654cb2ecc260085afc0af0c3707d5 (patch) | |
tree | a5615135bcf2dab8f04f2eac4b2e14f4da27ad3b | |
parent | df11cdaf73e3d984a40d719ad170cabc6f24c5ba (diff) | |
parent | b5a6465dd30da72a30d0027d3da79afbbcbab4f9 (diff) | |
download | rabbitmq-server-de2899d8d01654cb2ecc260085afc0af0c3707d5.tar.gz |
Merge bug14647
-rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 11 |
3 files changed, 31 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6bf2f6db..870c119a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,7 +39,7 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/4]). + stat/1, deliver/2, requeue/3, ack/4, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). @@ -124,6 +124,7 @@ -spec(ack/4 :: (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid()) -> 'ok'). +-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()). -spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). @@ -367,6 +368,9 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). +reject(QPid, MsgIds, Requeue, ChPid) -> + delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}). + commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 67f0fcf5..ac5fb7f9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -783,6 +783,21 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State#q{backing_queue_state = BQS1}) end; +handle_cast({reject, AckTags, Requeue, ChPid}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + case lookup_ch(ChPid) of + not_found -> + noreply(State); + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(case Requeue of + true -> requeue_and_run(AckTags, State); + false -> BQS1 = BQ:ack(AckTags, BQS), + State #q { backing_queue_state = BQS1 } + end) + end; + handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dd20d915..c4ff361d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -637,6 +637,17 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), {noreply, State2}; +handle_method(#'basic.reject'{delivery_tag = DeliveryTag, + requeue = Requeue}, + _, State = #ch{ unacked_message_q = UAMQ}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), + ok = fold_per_queue( + fun (QPid, MsgIds, ok) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, ok, Acked), + ok = notify_limiter(State#ch.limiter_pid, Acked), + {noreply, State#ch{unacked_message_q = Remaining}}; + handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = false, |