summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-03 15:26:28 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-03 15:26:28 +0000
commitff72070653c4c4235353ecffa14810f453c4d319 (patch)
treeda3eee822d5095f32e9f6b86771b7458f1cd828c
parentf525c97b059b62cf68361cd70e346339aa088582 (diff)
parentaad2fe5095630ca36311e705c6ceb37e6ebeb6cd (diff)
downloadrabbitmq-server-bug23335.tar.gz
merge with defaultbug23335
-rw-r--r--src/rabbit_channel.erl41
1 files changed, 13 insertions, 28 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 873268cd..98acc181 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -245,7 +245,9 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired, Msg},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
- State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
+ State1 = lock_message(AckRequired,
+ ack_record(DeliveryTag, ConsumerTag, Msg),
+ State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
{_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
maybe_incr_stats([{QPid, 1}],
@@ -504,7 +506,9 @@ handle_method(#'basic.get'{queue = QueueNameBin,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content}}} ->
- State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
+ State1 = lock_message(not(NoAck),
+ ack_record(DeliveryTag, none, Msg),
+ State),
maybe_incr_stats([{QPid, 1}],
case NoAck of
true -> get_no_ack;
@@ -640,30 +644,8 @@ handle_method(#'basic.recover_async'{requeue = true},
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover_async'{requeue = false},
- _, State = #ch{writer_pid = WriterPid,
- unacked_message_q = UAMQ}) ->
- ok = rabbit_misc:queue_fold(
- fun ({_DeliveryTag, none, _Msg}, ok) ->
- %% Was sent as a basic.get_ok. Don't redeliver
- %% it. FIXME: appropriate?
- ok;
- ({DeliveryTag, ConsumerTag,
- {QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
- %% Was sent as a proper consumer delivery. Resend
- %% it as before.
- %%
- %% FIXME: What should happen if the consumer's been
- %% cancelled since?
- %%
- %% FIXME: should we allocate a fresh DeliveryTag?
- internal_deliver(
- WriterPid, false, ConsumerTag, DeliveryTag,
- {QName, QPid, MsgId, true, Message})
- end, ok, UAMQ),
- %% No answer required - basic.recover is the newer, synchronous
- %% variant of this method
- {noreply, State};
+handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "requeue=false", []);
handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
{noreply, State2 = #ch{writer_pid = WriterPid}} =
@@ -985,6 +967,10 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
+ack_record(DeliveryTag, ConsumerTag,
+ _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
+ {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
+
collect_acks(Q, 0, true) ->
{Q, queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
@@ -1057,8 +1043,7 @@ rollback_and_notify(State) ->
fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
- fun ({_DTag, _CTag,
- {_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
+ fun ({_DTag, _CTag, {QPid, MsgId}}, D) ->
%% dict:append would avoid the lists:reverse in
%% handle_message({recover, true}, ...). However, it
%% is significantly slower when going beyond a few