diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-23 18:12:35 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-23 18:12:35 +0000 |
commit | fced7c44a84af0a37298c575cf8430af23444085 (patch) | |
tree | 9132db6cef9163db22d33ba71deed9c2e9fa1dea | |
parent | 60ee7633c950c178ebd5a6587eddb27dfd48f90f (diff) | |
parent | ff72070653c4c4235353ecffa14810f453c4d319 (diff) | |
download | rabbitmq-server-fced7c44a84af0a37298c575cf8430af23444085.tar.gz |
Merging bug 23335 into default
-rw-r--r-- | src/rabbit_channel.erl | 41 |
1 files changed, 13 insertions, 28 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 800cc237..6bed63a3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -245,7 +245,9 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> - State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), + State1 = lock_message(AckRequired, + ack_record(DeliveryTag, ConsumerTag, Msg), + State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, maybe_incr_stats([{QPid, 1}], @@ -506,7 +508,9 @@ handle_method(#'basic.get'{queue = QueueNameBin, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}} -> - State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), + State1 = lock_message(not(NoAck), + ack_record(DeliveryTag, none, Msg), + State), maybe_incr_stats([{QPid, 1}], case NoAck of true -> get_no_ack; @@ -642,30 +646,8 @@ handle_method(#'basic.recover_async'{requeue = true}, %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; -handle_method(#'basic.recover_async'{requeue = false}, - _, State = #ch{writer_pid = WriterPid, - unacked_message_q = UAMQ}) -> - ok = rabbit_misc:queue_fold( - fun ({_DeliveryTag, none, _Msg}, ok) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> - %% Was sent as a proper consumer delivery. Resend - %% it as before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - internal_deliver( - WriterPid, false, ConsumerTag, DeliveryTag, - {QName, QPid, MsgId, true, Message}) - end, ok, UAMQ), - %% No answer required - basic.recover is the newer, synchronous - %% variant of this method - {noreply, State}; +handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "requeue=false", []); handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> {noreply, State2 = #ch{writer_pid = WriterPid}} = @@ -987,6 +969,10 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). +ack_record(DeliveryTag, ConsumerTag, + _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> + {DeliveryTag, ConsumerTag, {QPid, MsgId}}. + collect_acks(Q, 0, true) -> {Q, queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> @@ -1059,8 +1045,7 @@ rollback_and_notify(State) -> fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( - fun ({_DTag, _CTag, - {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> + fun ({_DTag, _CTag, {QPid, MsgId}}, D) -> %% dict:append would avoid the lists:reverse in %% handle_message({recover, true}, ...). However, it %% is significantly slower when going beyond a few |