summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-01-11 17:06:07 +0000
committerRob Harrop <rob@rabbitmq.com>2011-01-11 17:06:07 +0000
commit36eee0ca2a10a05895e8f48cf22e5c3a2b6f2085 (patch)
tree938dcdf551afa7c314e609ba53a25c96ba9e7412
parent8b273beb6e40b0647e1ee8e532f57b4220785bd0 (diff)
downloadrabbitmq-server-36eee0ca2a10a05895e8f48cf22e5c3a2b6f2085.tar.gz
Extract common logic for message rejection. Introduce basic.nack and rework basic.reject
-rw-r--r--src/rabbit_channel.erl24
1 files changed, 16 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 930e48e6..1ad92318 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -568,6 +568,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
_ -> add_tx_participants(DeliveredQPids, State2)
end};
+handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = Multiple},
+ _, State) ->
+ reject(DeliveryTag, true, Multiple, State);
+
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
@@ -753,14 +758,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
- _, State = #ch{unacked_message_q = UAMQ}) ->
- {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}};
+ _, State) ->
+ reject(DeliveryTag, Requeue, false, State);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -1078,6 +1077,15 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
+reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}}.
+
ack_record(DeliveryTag, ConsumerTag,
_MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.