diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-17 12:01:57 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-17 12:01:57 +0000 |
commit | 8ba9a5eb2bf173fe0b2d8eafe086ad5b98ab8253 (patch) | |
tree | 53b0918efb59ced87c6a07274ea65b685e1d2f93 | |
parent | 916f0d7414bdab0ce0b28be5c2f8b61af461b5ba (diff) | |
parent | e74f420db40c772e77454dd05f32f7c172a2156a (diff) | |
download | rabbitmq-server-8ba9a5eb2bf173fe0b2d8eafe086ad5b98ab8253.tar.gz |
merging bug23929 to default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 80 |
1 files changed, 45 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7c4b5190..3f5758ce 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -439,19 +439,26 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. -record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> - {never, State}; -record_confirm_message(#delivery{sender = ChPid, +should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> + never; +should_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, - State = #q{q = #amqqueue{durable = true}, - msg_id_to_channel = MTC}) -> - {eventually, - State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}}; -record_confirm_message(_Delivery, State) -> - {immediately, State}. + #q{q = #amqqueue{durable = true}}) -> + {eventually, ChPid, MsgSeqNo, MsgId}; +should_confirm_message(_Delivery, _State) -> + immediately. + +needs_confirming({eventually, _, _, _}) -> true; +needs_confirming(_) -> false. + +maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId}, + State = #q{msg_id_to_channel = MTC}) -> + State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}; +maybe_record_confirm_message(_Confirm, State) -> + State. run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, @@ -465,9 +472,10 @@ run_message_queue(State) -> attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, - msg_seq_no = MsgSeqNo}, - {NeedsConfirming, State = #q{backing_queue = BQ}}) -> - case NeedsConfirming of + msg_seq_no = MsgSeqNo} = Delivery, + State = #q{backing_queue = BQ}) -> + Confirm = should_confirm_message(Delivery, State), + case Confirm of immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, @@ -481,36 +489,36 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered( AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = (NeedsConfirming =:= eventually)}, + needs_confirming = needs_confirming(Confirm)}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, {Delivered, State1} = deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), - {Delivered, NeedsConfirming, State1}; + {Delivered, Confirm, State1}; attempt_delivery(#delivery{txn = Txn, sender = ChPid, - message = Message}, - {NeedsConfirming, State = #q{backing_queue = BQ, - backing_queue_state = BQS}}) -> + message = Message} = Delivery, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), - {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}. - -deliver_or_enqueue(Delivery, State) -> - case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, _, State1} -> - State1; - {false, NeedsConfirming, State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}} -> - #delivery{message = Message} = Delivery, - BQS1 = BQ:publish(Message, - (message_properties(State)) #message_properties{ - needs_confirming = - (NeedsConfirming =:= eventually)}, - BQS), - ensure_ttl_timer(State1#q{backing_queue_state = BQS1}) + {true, should_confirm_message(Delivery, State), + State#q{backing_queue_state = BQS1}}. + +deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> + {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_record_confirm_message(Confirm, State1), + case Delivered of + true -> State2; + false -> BQS1 = + BQ:publish(Message, + (message_properties(State)) #message_properties{ + needs_confirming = needs_confirming(Confirm)}, + BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> @@ -829,9 +837,11 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, _NeedsConfirming, State1} = - attempt_delivery(Delivery, record_confirm_message(Delivery, State)), - reply(Delivered, State1); + {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + reply(Delivered, case Delivered of + true -> maybe_record_confirm_message(Confirm, State1); + false -> State1 + end); handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" delivery mode. Reply asap. |