diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-05 08:53:23 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-05 08:53:23 +0000 |
commit | 608ba6ef42d63b95c744d5d744ab1e4181f6ce45 (patch) | |
tree | 98e329f8b8889b20e158723d77dd484d7d3fbf71 | |
parent | c8044c53b6a8eed5b685ff263b4ffbcba37a98c7 (diff) | |
download | rabbitmq-server-608ba6ef42d63b95c744d5d744ab1e4181f6ce45.tar.gz |
cosmetic
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 101 |
1 files changed, 44 insertions, 57 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7719dfe7..24de9415 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -283,17 +283,16 @@ lookup_ch(ChPid) -> ch_record(ChPid) -> Key = {ch, ChPid}, case get(Key) of - undefined -> - MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumer_count = 0, - ch_pid = ChPid, - monitor_ref = MonitorRef, - acktags = sets:new(), - is_limit_active = false, - txn = none, - unsent_message_count = 0}, - put(Key, C), - C; + undefined -> MonitorRef = erlang:monitor(process, ChPid), + C = #cr{consumer_count = 0, + ch_pid = ChPid, + monitor_ref = MonitorRef, + acktags = sets:new(), + is_limit_active = false, + txn = none, + unsent_message_count = 0}, + put(Key, C), + C; C = #cr{} -> C end. @@ -319,18 +318,16 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. -all_ch_record() -> - [C || {{ch, _}, C} <- get()]. +all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. ch_record_state_transition(OldCR, NewCR) -> - BlockedOld = is_ch_blocked(OldCR), - BlockedNew = is_ch_blocked(NewCR), - if BlockedOld andalso not(BlockedNew) -> unblock; - BlockedNew andalso not(BlockedOld) -> block; - true -> ok + case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of + {true, false} -> unblock; + {false, true} -> block; + {_, _} -> ok end. deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, @@ -365,13 +362,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), BlockedConsumers}; - block -> - {ActiveConsumers1, BlockedConsumers1} = - move_consumers(ChPid, - ActiveConsumersTail, - BlockedConsumers), - {ActiveConsumers1, - queue:in(QEntry, BlockedConsumers1)} + block -> {ActiveConsumers1, BlockedConsumers1} = + move_consumers(ChPid, + ActiveConsumersTail, + BlockedConsumers), + {ActiveConsumers1, + queue:in(QEntry, BlockedConsumers1)} end, State2 = State1#q{ active_consumers = NewActiveConsumers, @@ -396,8 +392,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {FunAcc, State} end. -deliver_from_queue_pred(IsEmpty, _State) -> - not IsEmpty. +deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = @@ -405,17 +400,16 @@ deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> - {CMs, GTC1} = - lists:foldl( - fun(Guid, {CMs, GTC0}) -> - case dict:find(Guid, GTC0) of - {ok, {ChPid, MsgSeqNo}} -> - {gb_trees_cons(ChPid, MsgSeqNo, CMs), - dict:erase(Guid, GTC0)}; - _ -> - {CMs, GTC0} - end - end, {gb_trees:empty(), GTC}, Guids), + {CMs, GTC1} = lists:foldl( + fun(Guid, {CMs, GTC0}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {gb_trees_cons(ChPid, MsgSeqNo, CMs), + dict:erase(Guid, GTC0)}; + _ -> + {CMs, GTC0} + end + end, {gb_trees:empty(), GTC}, Guids), gb_trees:map(fun(ChPid, MsgSeqNos) -> rabbit_channel:confirm(ChPid, MsgSeqNos) end, CMs), @@ -480,17 +474,14 @@ attempt_delivery(#delivery{txn = none, {Delivered, State1} = deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), {Delivered, NeedsConfirming, State1}; -attempt_delivery(#delivery{txn = Txn, +attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, - {NeedsConfirming, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}}) -> + {NeedsConfirming, State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - {true, - NeedsConfirming, - State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. + BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), + {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of @@ -661,9 +652,8 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> Now = now_micros(), BQS1 = BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> - Now > Expiry - end, BQS), + fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, @@ -814,8 +804,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery}, - _From, State) -> +handle_call({deliver_immediately, Delivery}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -906,15 +895,13 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, case is_ch_blocked(C) of true -> State1#q{ blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; + add_consumer(ChPid, Consumer, + State1#q.blocked_consumers)}; false -> run_message_queue( State1#q{ active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) + add_consumer(ChPid, Consumer, + State1#q.active_consumers)}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck), |