summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-12-23 21:52:36 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-12-23 21:52:36 +0000
commita37660ab435b588a7d5782297f80c9db07279428 (patch)
tree30573744c32264bd7559dd52669ec35adac3633a
parenta5b96a8c71ce505fb4ce03cc9e58682cc6d5f228 (diff)
downloadrabbitmq-server-a37660ab435b588a7d5782297f80c9db07279428.tar.gz
various simplifying refactors
these are actually all quite trivial; the diffs are large mainly due to whitspace and renames
-rw-r--r--src/rabbit_amqqueue_process.erl186
1 files changed, 93 insertions, 93 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5a759430..9f7b6143 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -598,21 +598,21 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) ->
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- case BQ:is_duplicate(Message, BQS) of
- {false, BQS1} ->
- deliver_msgs_to_consumers(
- fun (true, State1 = #q{backing_queue_state = BQS2}) ->
- true = BQ:is_empty(BQS2),
- {AckTag, BQS3} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS2),
- {{Message, Delivered, AckTag},
- true, State1#q{backing_queue_state = BQS3}};
- (false, State1) ->
- {{Message, Delivered, undefined},
- true, discard(Delivery, State1)}
- end, false, State#q{backing_queue_state = BQS1});
- {true, BQS1} ->
- {true, State#q{backing_queue_state = BQS1}}
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case IsDuplicate of
+ false -> deliver_msgs_to_consumers(
+ fun (true, State2 = #q{backing_queue_state = BQS2}) ->
+ true = BQ:is_empty(BQS2),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ Message, Props, SenderPid, BQS2),
+ {{Message, Delivered, AckTag},
+ true, State2#q{backing_queue_state = BQS3}};
+ (false, State2) ->
+ {{Message, Delivered, undefined},
+ true, discard(Delivery, State2)}
+ end, false, State1);
+ true -> {true, State1}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
@@ -738,35 +738,34 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+handle_ch_down(DownPid, State = #q{active_consumers = AC,
+ exclusive_consumer = Holder,
senders = Senders}) ->
- Senders1 = case pmon:is_monitored(DownPid, Senders) of
- false -> Senders;
- true -> credit_flow:peer_down(DownPid),
- pmon:demonitor(DownPid, Senders)
- end,
+ State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
+ false -> Senders;
+ true -> credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
+ end},
case lookup_ch(DownPid) of
not_found ->
- {ok, State#q{senders = Senders1}};
+ {ok, State1};
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
QName = qname(State),
- _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
+ AC1 = remove_consumers(ChPid, AC, QName),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
- State1 = State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- active_consumers = remove_consumers(
- ChPid, State#q.active_consumers,
- QName),
- senders = Senders1},
- case should_auto_delete(State1) of
- true -> {stop, State1};
+ Holder1 = case Holder of
+ {DownPid, _} -> none;
+ Other -> Other
+ end,
+ State2 = State1#q{active_consumers = AC1,
+ exclusive_consumer = Holder1},
+ case should_auto_delete(State2) of
+ true -> {stop, State2};
false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
- ensure_expiry_timer(State1))}
+ ensure_expiry_timer(State2))}
end
end.
@@ -1184,64 +1183,66 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag}, State2} ->
- State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- case AckRequired of
- true -> C = #cr{acktags = ChAckTags} =
- ch_record(ChPid, LimiterPid),
- ChAckTags1 = queue:in(AckTag, ChAckTags),
- update_ch_record(C#cr{acktags = ChAckTags1}),
- State2;
- false -> State2
- end,
+ {{Message, IsDelivered, AckTag},
+ #q{backing_queue = BQ, backing_queue_state = BQS} = State2} ->
+ case AckRequired of
+ true -> C = #cr{acktags = ChAckTags} =
+ ch_record(ChPid, LimiterPid),
+ ChAckTags1 = queue:in(AckTag, ChAckTags),
+ update_ch_record(C#cr{acktags = ChAckTags1});
+ false -> ok
+ end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, BQ:len(BQS), Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State2)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
- _From, State = #q{exclusive_consumer = Holder}) ->
+ _From, State = #q{active_consumers = AC,
+ exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = Count,
- limiter = Limiter} = ch_record(ChPid, LimiterPid),
- Limiter1 = case LimiterActive of
- true -> rabbit_limiter:activate(Limiter);
- false -> Limiter
- end,
- Limiter2 = case CreditArgs of
- none -> Limiter1;
- {Crd, Drain} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag, Crd, Drain)
- end,
- C1 = update_ch_record(C#cr{consumer_count = Count + 1,
- limiter = Limiter2}),
- case is_empty(State) of
- true -> send_drained(C1);
- false -> ok
- end,
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck,
- args = OtherArgs},
- ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> Holder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), OtherArgs),
- AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers),
- State2 = State1#q{active_consumers = AC1},
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State2),
- reply(ok, run_message_queue(State2))
+ in_use -> reply({error, exclusive_consume_unavailable}, State);
+ ok -> C = #cr{consumer_count = Count,
+ limiter = Limiter} =
+ ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ Limiter2 = case CreditArgs of
+ none -> Limiter1;
+ {Crd, Drain} -> rabbit_limiter:credit(
+ Limiter1, ConsumerTag,
+ Crd, Drain)
+ end,
+ C1 = update_ch_record(C#cr{consumer_count = Count + 1,
+ limiter = Limiter2}),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck,
+ args = OtherArgs},
+ AC1 = add_consumer({ChPid, Consumer}, AC),
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> Holder
+ end,
+ State1 = State#q{active_consumers = AC1,
+ has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck, qname(State1), OtherArgs),
+ notify_decorators(
+ basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ reply(ok, run_message_queue(State1))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{exclusive_consumer = Holder}) ->
+ State = #q{active_consumers = AC,
+ exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
not_found ->
@@ -1249,7 +1250,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
C = #cr{consumer_count = Count,
limiter = Limiter,
blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
+ AC1 = remove_consumer(ChPid, ConsumerTag, AC),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
Limiter1 = case Count of
1 -> rabbit_limiter:deactivate(Limiter);
@@ -1259,14 +1260,13 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
update_ch_record(C#cr{consumer_count = Count - 1,
limiter = Limiter2,
blocked_consumers = Blocked1}),
- State1 = State#q{
- exclusive_consumer = case Holder of
- {ChPid, ConsumerTag} -> none;
- _ -> Holder
- end,
- active_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.active_consumers)},
+ Holder1 = case Holder of
+ {ChPid, ConsumerTag} -> none;
+ _ -> Holder
+ end,
+ State1 = State#q{active_consumers = AC1,
+ exclusive_consumer = Holder1},
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
notify_decorators(
basic_cancel, [{consumer_tag, ConsumerTag}], State1),
case should_auto_delete(State1) of