diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-31 14:32:30 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-31 14:32:30 +0000 |
commit | f27016fde8999bd0829cb4c4846c722f4bd8daf1 (patch) | |
tree | b7d6364a4a4beaf1ed1af185e7e4b57df4c2e2ef | |
parent | b3efbd8b78d4a115a6443def25f5fa29304999e6 (diff) | |
download | rabbitmq-server-f27016fde8999bd0829cb4c4846c722f4bd8daf1.tar.gz |
confirm transient messages in the queues
Transient messages are now handled exactly like persistent messages:
they are ack'd only after all of the queues have ack'd. Unroutable
messages and messages routed to 0 queues are still ack'd by the
channel.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 20 |
2 files changed, 17 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f6255d2e..e78eb06f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -426,6 +426,9 @@ confirm_message(#basic_message{guid = Guid}, State) -> record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> State; +record_confirm_message(#delivery{message = #basic_message{ + is_persistent = false}}, State) -> + State; record_confirm_message(#delivery{msg_seq_no = MsgSeqNo, sender = ChPid, message = #basic_message{guid = Guid}}, @@ -447,9 +450,15 @@ run_message_queue(State) -> State2. attempt_delivery(#delivery{txn = none, + sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ}) -> + IsPersistent = Message#basic_message.is_persistent, + case IsPersistent of + false -> rabbit_channel:confirm(ChPid, MsgSeqNo); + _ -> ok + end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -460,8 +469,7 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered(AckRequired, Message, ?BASE_MESSAGE_PROPERTIES #message_properties { - needs_confirming = - (MsgSeqNo =/= undefined)}, + needs_confirming = IsPersistent }, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7c45b52d..fdf4bdc2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -556,12 +556,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, - case IsPersistent of - true -> MsgSeqNo; - false -> undefined - end)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || @@ -1245,19 +1241,17 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) -> +process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) -> +process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, [], _, MsgSeqNo, _, State) -> +process_routing_result(routed, [], MsgSeqNo, _, State) -> send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, _, _, undefined, _, State) -> +process_routing_result(routed, _, undefined, _, State) -> State; -process_routing_result(routed, _, false, MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, QPids, true, MsgSeqNo, _, +process_routing_result(routed, QPids, MsgSeqNo, _, State = #ch{queues_for_msg = QFM}) -> QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), [maybe_monitor(QPid) || QPid <- QPids], |