summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-01-14 15:41:52 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-01-14 15:41:52 +0000
commitc402e96e050192962e85c018c06844ed1670903e (patch)
tree61926259b0cab72fee8a499021e21d8af32e7667
parent0a186a2f2c71bc5b17d779d9062bd86c09edd90e (diff)
parenta6554f87ed0ee87274ec379cce537c16360bb7fd (diff)
downloadrabbitmq-server-c402e96e050192962e85c018c06844ed1670903e.tar.gz
Merge bug23675
-rw-r--r--src/rabbit_channel.erl25
1 files changed, 17 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e2c3694b..1e909686 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -554,6 +554,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
_ -> add_tx_participants(DeliveredQPids, State2)
end};
+handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = Multiple,
+ requeue = Requeue},
+ _, State) ->
+ reject(DeliveryTag, Requeue, Multiple, State);
+
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
@@ -739,14 +745,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
- _, State = #ch{unacked_message_q = UAMQ}) ->
- {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}};
+ _, State) ->
+ reject(DeliveryTag, Requeue, false, State);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -1064,6 +1064,15 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
+reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}}.
+
ack_record(DeliveryTag, ConsumerTag,
_MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.