summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-03-16 10:58:31 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-03-16 10:58:31 +0000
commitade1d061c1f3ac97a02324f121e8ac1b03311ff5 (patch)
tree42bbfdfa7579a9c63cb83142e8333960bca4ade6
parent11681ae6031b69432626e7d92a699b07dd021c95 (diff)
downloadrabbitmq-server-ade1d061c1f3ac97a02324f121e8ac1b03311ff5.tar.gz
don't record anything confirm-related for immediate unroutable messages
-rw-r--r--src/rabbit_amqqueue_process.erl76
1 files changed, 50 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c4b5190..5dbc8828 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -439,19 +439,24 @@ gb_trees_cons(Key, Value, Tree) ->
none -> gb_trees:insert(Key, [Value], Tree)
end.
-record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- {never, State};
-record_confirm_message(#delivery{sender = ChPid,
+should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
+ never;
+should_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- State = #q{q = #amqqueue{durable = true},
- msg_id_to_channel = MTC}) ->
- {eventually,
- State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}};
-record_confirm_message(_Delivery, State) ->
- {immediately, State}.
+ #q{q = #amqqueue{durable = true}}) ->
+ {eventually, ChPid, MsgSeqNo, MsgId};
+should_confirm_message(_Delivery, _State) ->
+ immediately.
+
+record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
+ State = #q{msg_id_to_channel = MTC}) ->
+ State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)};
+record_confirm_message(Confirm, State)
+ when Confirm =:= immediately orelse Confirm =:= never ->
+ State.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -466,8 +471,9 @@ attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
- {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
- case NeedsConfirming of
+ Confirm,
+ State = #q{backing_queue = BQ}) ->
+ case Confirm of
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
@@ -477,40 +483,50 @@ attempt_delivery(#delivery{txn = none,
%% we don't need an expiry here because messages are
%% not being enqueued, so we use an empty
%% message_properties.
+ NeedsConfirming = case Confirm of
+ {eventually, _, _, _} -> true;
+ _ -> false
+ end,
{AckTag, BQS1} =
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = (NeedsConfirming =:= eventually)},
+ needs_confirming = NeedsConfirming},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
{Delivered, State1} =
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
- {Delivered, NeedsConfirming, State1};
+ {Delivered, Confirm, State1};
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- {NeedsConfirming, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}}) ->
+ Confirm,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
- {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}.
+ {true, Confirm, State#q{backing_queue_state = BQS1}}.
deliver_or_enqueue(Delivery, State) ->
- case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, _, State1} ->
- State1;
- {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
+ case attempt_delivery(Delivery,
+ should_confirm_message(Delivery, State), State) of
+ {true, Confirm, State1} ->
+ record_confirm_message(Confirm, State1);
+ {false, Confirm, State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}} ->
#delivery{message = Message} = Delivery,
+ NeedsConfirming = case Confirm of
+ {eventually, _, _, _} -> true;
+ _ -> false
+ end,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
- needs_confirming =
- (NeedsConfirming =:= eventually)},
+ needs_confirming = NeedsConfirming},
BQS),
- ensure_ttl_timer(State1#q{backing_queue_state = BQS1})
+ State2 = record_confirm_message(Confirm, State1),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
@@ -829,9 +845,17 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, _NeedsConfirming, State1} =
- attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, State1);
+ {Delivered, Confirm, State1} =
+ attempt_delivery(Delivery,
+ should_confirm_message(Delivery, State),
+ State),
+ State2 = case {Confirm, Delivered} of
+ {{eventually, _, _, _}, true} ->
+ record_confirm_message(Confirm, State);
+ _ ->
+ State1
+ end,
+ reply(Delivered, State2);
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.