diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-17 11:10:06 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-17 11:10:06 +0100 |
commit | b25c1f560c34ea3595b017499e7759c0ab7decb3 (patch) | |
tree | 66200b9278d5eb49ce33d61e349cdcedad1519ca | |
parent | 525962e167fdebb69f10d19422e0c3aebfa8c6fc (diff) | |
download | rabbitmq-server-b25c1f560c34ea3595b017499e7759c0ab7decb3.tar.gz |
rename maybe_store_ch_record to update_ch_record and drop assertions
...and some associated cosmetic shuffling and inlining.
The assertions aren't all that useful and just clutter the code.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 |
1 files changed, 20 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d41db65a..82ed2ec8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -348,19 +348,18 @@ ch_record(ChPid) -> C = #cr{} -> C end. -store_ch_record(C = #cr{ch_pid = ChPid}) -> - put({ch, ChPid}, C). - -maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, - acktags = ChAckTags, - unsent_message_count = UnsentMessageCount}) -> +update_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + unsent_message_count = UnsentMessageCount}) -> case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of - {0, 0, 0} -> ok = erase_ch_record(C), - false; - _ -> store_ch_record(C), - true + {0, 0, 0} -> ok = erase_ch_record(C); + _ -> ok = store_ch_record(C) end. +store_ch_record(C = #cr{ch_pid = ChPid}) -> + put({ch, ChPid}, C), + ok. + erase_ch_record(#cr{ch_pid = ChPid, limiter = Limiter, monitor_ref = MonitorRef}) -> @@ -408,7 +407,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, - true = maybe_store_ch_record(NewC), + update_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), @@ -426,7 +425,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> - true = maybe_store_ch_record(C#cr{is_limit_active = true}), + update_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, @@ -609,7 +608,7 @@ possibly_unblock(State, ChPid, Update) -> State; C -> NewC = Update(C), - maybe_store_ch_record(NewC), + update_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; unblock -> {NewBlockedConsumers, NewActiveConsumers} = @@ -946,10 +945,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, State3 = case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - true = maybe_store_ch_record( - C#cr{acktags = - sets:add_element(AckTag, - ChAckTags)}), + ChAckTags1 = sets:add_element(AckTag, ChAckTags), + update_ch_record(C#cr{acktags = ChAckTags1}), State2; false -> State2 end, @@ -968,9 +965,8 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - true = maybe_store_ch_record( - C#cr{consumer_count = ConsumerCount +1, - limiter = Limiter}), + update_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter = Limiter}), ok = case ConsumerCount of 0 -> rabbit_limiter:register(Limiter, self()); _ -> ok @@ -1007,7 +1003,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C = #cr{consumer_count = ConsumerCount, limiter = Limiter} -> C1 = C#cr{consumer_count = ConsumerCount -1}, - maybe_store_ch_record( + update_ch_record( case ConsumerCount of 1 -> ok = rabbit_limiter:unregister(Limiter, self()), C1#cr{limiter = rabbit_limiter:make_token()}; @@ -1061,8 +1057,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1}), + update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}), noreply(requeue_and_run(AckTags, State)) end. @@ -1079,8 +1074,7 @@ handle_cast({ack, AckTags, ChPid}, not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - maybe_store_ch_record(C#cr{acktags = subtract_acks( - ChAckTags, AckTags)}), + update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}), {_Guids, BQS1} = BQ:ack(AckTags, BQS), noreply(State#q{backing_queue_state = BQS1}) end; @@ -1092,8 +1086,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1}), + update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), |