summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-31 14:32:30 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-31 14:32:30 +0000
commitf27016fde8999bd0829cb4c4846c722f4bd8daf1 (patch)
treeb7d6364a4a4beaf1ed1af185e7e4b57df4c2e2ef
parentb3efbd8b78d4a115a6443def25f5fa29304999e6 (diff)
downloadrabbitmq-server-f27016fde8999bd0829cb4c4846c722f4bd8daf1.tar.gz
confirm transient messages in the queues
Transient messages are now handled exactly like persistent messages: they are ack'd only after all of the queues have ack'd. Unroutable messages and messages routed to 0 queues are still ack'd by the channel.
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_channel.erl20
2 files changed, 17 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f6255d2e..e78eb06f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -426,6 +426,9 @@ confirm_message(#basic_message{guid = Guid}, State) ->
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
State;
+record_confirm_message(#delivery{message = #basic_message{
+ is_persistent = false}}, State) ->
+ State;
record_confirm_message(#delivery{msg_seq_no = MsgSeqNo,
sender = ChPid,
message = #basic_message{guid = Guid}},
@@ -447,9 +450,15 @@ run_message_queue(State) ->
State2.
attempt_delivery(#delivery{txn = none,
+ sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ}) ->
+ IsPersistent = Message#basic_message.is_persistent,
+ case IsPersistent of
+ false -> rabbit_channel:confirm(ChPid, MsgSeqNo);
+ _ -> ok
+ end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
@@ -460,8 +469,7 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(AckRequired, Message,
?BASE_MESSAGE_PROPERTIES
#message_properties {
- needs_confirming =
- (MsgSeqNo =/= undefined)},
+ needs_confirming = IsPersistent },
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7c45b52d..fdf4bdc2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -556,12 +556,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
- case IsPersistent of
- true -> MsgSeqNo;
- false -> undefined
- end)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
@@ -1245,19 +1241,17 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) ->
+process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) ->
+process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, [], _, MsgSeqNo, _, State) ->
+process_routing_result(routed, [], MsgSeqNo, _, State) ->
send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, _, _, undefined, _, State) ->
+process_routing_result(routed, _, undefined, _, State) ->
State;
-process_routing_result(routed, _, false, MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, QPids, true, MsgSeqNo, _,
+process_routing_result(routed, QPids, MsgSeqNo, _,
State = #ch{queues_for_msg = QFM}) ->
QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
[maybe_monitor(QPid) || QPid <- QPids],