summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-17 11:10:06 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-17 11:10:06 +0100
commitb25c1f560c34ea3595b017499e7759c0ab7decb3 (patch)
tree66200b9278d5eb49ce33d61e349cdcedad1519ca
parent525962e167fdebb69f10d19422e0c3aebfa8c6fc (diff)
downloadrabbitmq-server-b25c1f560c34ea3595b017499e7759c0ab7decb3.tar.gz
rename maybe_store_ch_record to update_ch_record and drop assertions
...and some associated cosmetic shuffling and inlining. The assertions aren't all that useful and just clutter the code.
-rw-r--r--src/rabbit_amqqueue_process.erl47
1 files changed, 20 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d41db65a..82ed2ec8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -348,19 +348,18 @@ ch_record(ChPid) ->
C = #cr{} -> C
end.
-store_ch_record(C = #cr{ch_pid = ChPid}) ->
- put({ch, ChPid}, C).
-
-maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
- acktags = ChAckTags,
- unsent_message_count = UnsentMessageCount}) ->
+update_ch_record(C = #cr{consumer_count = ConsumerCount,
+ acktags = ChAckTags,
+ unsent_message_count = UnsentMessageCount}) ->
case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {0, 0, 0} -> ok = erase_ch_record(C),
- false;
- _ -> store_ch_record(C),
- true
+ {0, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
end.
+store_ch_record(C = #cr{ch_pid = ChPid}) ->
+ put({ch, ChPid}, C),
+ ok.
+
erase_ch_record(#cr{ch_pid = ChPid,
limiter = Limiter,
monitor_ref = MonitorRef}) ->
@@ -408,7 +407,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
- true = maybe_store_ch_record(NewC),
+ update_ch_record(NewC),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
@@ -426,7 +425,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
- true = maybe_store_ch_record(C#cr{is_limit_active = true}),
+ update_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
@@ -609,7 +608,7 @@ possibly_unblock(State, ChPid, Update) ->
State;
C ->
NewC = Update(C),
- maybe_store_ch_record(NewC),
+ update_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
unblock -> {NewBlockedConsumers, NewActiveConsumers} =
@@ -946,10 +945,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State3 =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- true = maybe_store_ch_record(
- C#cr{acktags =
- sets:add_element(AckTag,
- ChAckTags)}),
+ ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
false -> State2
end,
@@ -968,9 +965,8 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- true = maybe_store_ch_record(
- C#cr{consumer_count = ConsumerCount +1,
- limiter = Limiter}),
+ update_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter = Limiter}),
ok = case ConsumerCount of
0 -> rabbit_limiter:register(Limiter, self());
_ -> ok
@@ -1007,7 +1003,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
C = #cr{consumer_count = ConsumerCount,
limiter = Limiter} ->
C1 = C#cr{consumer_count = ConsumerCount -1},
- maybe_store_ch_record(
+ update_ch_record(
case ConsumerCount of
1 -> ok = rabbit_limiter:unregister(Limiter, self()),
C1#cr{limiter = rabbit_limiter:make_token()};
@@ -1061,8 +1057,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
not_found ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
+ update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}),
noreply(requeue_and_run(AckTags, State))
end.
@@ -1079,8 +1074,7 @@ handle_cast({ack, AckTags, ChPid},
not_found ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
- maybe_store_ch_record(C#cr{acktags = subtract_acks(
- ChAckTags, AckTags)}),
+ update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}),
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
noreply(State#q{backing_queue_state = BQS1})
end;
@@ -1092,8 +1086,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
not_found ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
+ update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),