diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-10 14:08:30 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-10 14:08:30 +0000 |
commit | ca39327fb2b65218785d66a4aa66192783277d60 (patch) | |
tree | 64f989caae6bec8f92021aedf6e4397ac0f63bf9 | |
parent | e034b065ed63dc1a097b11365b584bae177baf34 (diff) | |
download | rabbitmq-server-ca39327fb2b65218785d66a4aa66192783277d60.tar.gz |
remove duplication of erroneous logic
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e6e3989f..a1342289 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -436,7 +436,7 @@ confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> State#q{guid_to_channel = dict:erase(Guid, GTC)}. record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> - State; + {'no_confirm', State}; record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { @@ -445,9 +445,9 @@ record_confirm_message(#delivery{sender = ChPid, State = #q{guid_to_channel = GTC, q = #amqqueue{durable = true}}) -> - State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}; + {'confirm', State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}}; record_confirm_message(_Delivery, State) -> - State. + {'no_confirm', State}. run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, @@ -462,12 +462,11 @@ attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, - State = #q{backing_queue = BQ, q = Q}) -> - NeedsConfirming = Message#basic_message.is_persistent andalso - Q#amqqueue.durable, - case NeedsConfirming of - false -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok + {NeedsConfirming, State = #q{backing_queue = BQ}}) -> + case {NeedsConfirming, MsgSeqNo} of + {_, undefined} -> ok; + {'no_confirm', _} -> rabbit_channel:confirm(ChPid, MsgSeqNo); + {'confirm', _} -> ok end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -479,31 +478,36 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered( AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = NeedsConfirming}, + needs_confirming = (NeedsConfirming =:= 'confirm')}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); + {Delivered, State1} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), + {Delivered, NeedsConfirming, State1}; attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + {NeedsConfirming, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> record_current_channel_tx(ChPid, Txn), {true, + NeedsConfirming, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, State1} -> + {true, _, State1} -> {true, State1}; - {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + {false, NC, State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}} -> + #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ - needs_confirming = (MsgSeqNo =/= undefined)}, + needs_confirming = (NC =:= 'confirm')}, BQS), {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. @@ -827,7 +831,7 @@ handle_call({deliver_immediately, Delivery}, %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, State1} = + {Delivered, _NeedsConfirming, State1} = attempt_delivery(Delivery, record_confirm_message(Delivery, State)), reply(Delivered, State1); |