summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@rabbitmq.com>2010-08-02 00:13:54 +0100
committerMichael Bridgen <mikeb@rabbitmq.com>2010-08-02 00:13:54 +0100
commitde2899d8d01654cb2ecc260085afc0af0c3707d5 (patch)
treea5615135bcf2dab8f04f2eac4b2e14f4da27ad3b
parentdf11cdaf73e3d984a40d719ad170cabc6f24c5ba (diff)
parentb5a6465dd30da72a30d0027d3da79afbbcbab4f9 (diff)
downloadrabbitmq-server-de2899d8d01654cb2ecc260085afc0af0c3707d5.tar.gz
Merge bug14647
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_channel.erl11
3 files changed, 31 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6bf2f6db..870c119a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -39,7 +39,7 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/4]).
+ stat/1, deliver/2, requeue/3, ack/4, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
@@ -124,6 +124,7 @@
-spec(ack/4 ::
(pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid())
-> 'ok').
+-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()).
-spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
@@ -367,6 +368,9 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+reject(QPid, MsgIds, Requeue, ChPid) ->
+ delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}).
+
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 67f0fcf5..ac5fb7f9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -783,6 +783,21 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State#q{backing_queue_state = BQS1})
end;
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ noreply(State);
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(case Requeue of
+ true -> requeue_and_run(AckTags, State);
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State #q { backing_queue_state = BQS1 }
+ end)
+ end;
+
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dd20d915..c4ff361d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -637,6 +637,17 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
{noreply, State2};
+handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
+ requeue = Requeue},
+ _, State = #ch{ unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}};
+
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,