diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-16 13:36:31 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-16 13:36:31 +0000 |
commit | 7556d6ae1e71ffc07dab7666216e94bbd91c1dec (patch) | |
tree | 5339db8ca5ad6253db1a27473d36a0918e77a339 | |
parent | 157d3f401c729df060b32327385211b27a0e0105 (diff) | |
download | rabbitmq-server-7556d6ae1e71ffc07dab7666216e94bbd91c1dec.tar.gz |
refactorings
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 56 |
1 files changed, 23 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 96352c13..4ebdb7a3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -454,11 +454,10 @@ should_confirm_message(_Delivery, _State) -> needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. -record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId}, - State = #q{msg_id_to_channel = MTC}) -> +maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId}, + State = #q{msg_id_to_channel = MTC}) -> State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}; -record_confirm_message(Confirm, State) - when Confirm =:= immediately orelse Confirm =:= never -> +maybe_record_confirm_message(_Confirm, State) -> State. run_message_queue(State) -> @@ -473,9 +472,9 @@ run_message_queue(State) -> attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, - msg_seq_no = MsgSeqNo}, - Confirm, + msg_seq_no = MsgSeqNo} = Delivery, State = #q{backing_queue = BQ}) -> + Confirm = should_confirm_message(Delivery, State), case Confirm of immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok @@ -500,28 +499,26 @@ attempt_delivery(#delivery{txn = none, {Delivered, Confirm, State1}; attempt_delivery(#delivery{txn = Txn, sender = ChPid, - message = Message}, - Confirm, + message = Message} = Delivery, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), - {true, Confirm, State#q{backing_queue_state = BQS1}}. - -deliver_or_enqueue(Delivery, State) -> - case attempt_delivery(Delivery, - should_confirm_message(Delivery, State), State) of - {true, Confirm, State1} -> - record_confirm_message(Confirm, State1); - {false, Confirm, State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}} -> - #delivery{message = Message} = Delivery, - BQS1 = BQ:publish(Message, - (message_properties(State)) #message_properties{ - needs_confirming = needs_confirming(Confirm)}, - BQS), - State2 = record_confirm_message(Confirm, State1), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + {true, should_confirm_message(Delivery, State), + State#q{backing_queue_state = BQS1}}. + +deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> + {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_record_confirm_message(Confirm, State1), + case Delivered of + true -> State2; + false -> BQS1 = + BQ:publish(Message, + (message_properties(State)) #message_properties{ + needs_confirming = needs_confirming(Confirm)}, + BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> @@ -840,15 +837,8 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, Confirm, State1} = - attempt_delivery(Delivery, - should_confirm_message(Delivery, State), - State), - State2 = case Delivered andalso needs_confirming(Confirm) of - true -> record_confirm_message(Confirm, State); - false -> State1 - end, - reply(Delivered, State2); + {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + reply(Delivered, maybe_record_confirm_message(Confirm, State1)); handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" delivery mode. Reply asap. |