diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-12-23 21:52:36 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-12-23 21:52:36 +0000 |
commit | a37660ab435b588a7d5782297f80c9db07279428 (patch) | |
tree | 30573744c32264bd7559dd52669ec35adac3633a /src/rabbit_amqqueue_process.erl | |
parent | a5b96a8c71ce505fb4ce03cc9e58682cc6d5f228 (diff) | |
download | rabbitmq-server-a37660ab435b588a7d5782297f80c9db07279428.tar.gz |
various simplifying refactors
these are actually all quite trivial; the diffs are large mainly due
to whitspace and renames
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 186 |
1 files changed, 93 insertions, 93 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5a759430..9f7b6143 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -598,21 +598,21 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) -> attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case BQ:is_duplicate(Message, BQS) of - {false, BQS1} -> - deliver_msgs_to_consumers( - fun (true, State1 = #q{backing_queue_state = BQS2}) -> - true = BQ:is_empty(BQS2), - {AckTag, BQS3} = BQ:publish_delivered( - Message, Props, SenderPid, BQS2), - {{Message, Delivered, AckTag}, - true, State1#q{backing_queue_state = BQS3}}; - (false, State1) -> - {{Message, Delivered, undefined}, - true, discard(Delivery, State1)} - end, false, State#q{backing_queue_state = BQS1}); - {true, BQS1} -> - {true, State#q{backing_queue_state = BQS1}} + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case IsDuplicate of + false -> deliver_msgs_to_consumers( + fun (true, State2 = #q{backing_queue_state = BQS2}) -> + true = BQ:is_empty(BQS2), + {AckTag, BQS3} = BQ:publish_delivered( + Message, Props, SenderPid, BQS2), + {{Message, Delivered, AckTag}, + true, State2#q{backing_queue_state = BQS3}}; + (false, State2) -> + {{Message, Delivered, undefined}, + true, discard(Delivery, State2)} + end, false, State1); + true -> {true, State1} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, @@ -738,35 +738,34 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, +handle_ch_down(DownPid, State = #q{active_consumers = AC, + exclusive_consumer = Holder, senders = Senders}) -> - Senders1 = case pmon:is_monitored(DownPid, Senders) of - false -> Senders; - true -> credit_flow:peer_down(DownPid), - pmon:demonitor(DownPid, Senders) - end, + State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end}, case lookup_ch(DownPid) of not_found -> - {ok, State#q{senders = Senders1}}; + {ok, State1}; C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> QName = qname(State), - _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission + AC1 = remove_consumers(ChPid, AC, QName), + _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission ok = erase_ch_record(C), - State1 = State#q{ - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - active_consumers = remove_consumers( - ChPid, State#q.active_consumers, - QName), - senders = Senders1}, - case should_auto_delete(State1) of - true -> {stop, State1}; + Holder1 = case Holder of + {DownPid, _} -> none; + Other -> Other + end, + State2 = State1#q{active_consumers = AC1, + exclusive_consumer = Holder1}, + case should_auto_delete(State2) of + true -> {stop, State2}; false -> {ok, requeue_and_run(queue:to_list(ChAckTags), - ensure_expiry_timer(State1))} + ensure_expiry_timer(State2))} end end. @@ -1184,64 +1183,66 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); - {{Message, IsDelivered, AckTag}, State2} -> - State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = - ch_record(ChPid, LimiterPid), - ChAckTags1 = queue:in(AckTag, ChAckTags), - update_ch_record(C#cr{acktags = ChAckTags1}), - State2; - false -> State2 - end, + {{Message, IsDelivered, AckTag}, + #q{backing_queue = BQ, backing_queue_state = BQS} = State2} -> + case AckRequired of + true -> C = #cr{acktags = ChAckTags} = + ch_record(ChPid, LimiterPid), + ChAckTags1 = queue:in(AckTag, ChAckTags), + update_ch_record(C#cr{acktags = ChAckTags1}); + false -> ok + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, BQ:len(BQS), Msg}, State3) + reply({ok, BQ:len(BQS), Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg}, - _From, State = #q{exclusive_consumer = Holder}) -> + _From, State = #q{active_consumers = AC, + exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of - in_use -> - reply({error, exclusive_consume_unavailable}, State); - ok -> - C = #cr{consumer_count = Count, - limiter = Limiter} = ch_record(ChPid, LimiterPid), - Limiter1 = case LimiterActive of - true -> rabbit_limiter:activate(Limiter); - false -> Limiter - end, - Limiter2 = case CreditArgs of - none -> Limiter1; - {Crd, Drain} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, Crd, Drain) - end, - C1 = update_ch_record(C#cr{consumer_count = Count + 1, - limiter = Limiter2}), - case is_empty(State) of - true -> send_drained(C1); - false -> ok - end, - Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck, - args = OtherArgs}, - ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> Holder - end, - State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), OtherArgs), - AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers), - State2 = State1#q{active_consumers = AC1}, - notify_decorators( - basic_consume, [{consumer_tag, ConsumerTag}], State2), - reply(ok, run_message_queue(State2)) + in_use -> reply({error, exclusive_consume_unavailable}, State); + ok -> C = #cr{consumer_count = Count, + limiter = Limiter} = + ch_record(ChPid, LimiterPid), + Limiter1 = case LimiterActive of + true -> rabbit_limiter:activate(Limiter); + false -> Limiter + end, + Limiter2 = case CreditArgs of + none -> Limiter1; + {Crd, Drain} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, + Crd, Drain) + end, + C1 = update_ch_record(C#cr{consumer_count = Count + 1, + limiter = Limiter2}), + case is_empty(State) of + true -> send_drained(C1); + false -> ok + end, + Consumer = #consumer{tag = ConsumerTag, + ack_required = not NoAck, + args = OtherArgs}, + AC1 = add_consumer({ChPid, Consumer}, AC), + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> Holder + end, + State1 = State#q{active_consumers = AC1, + has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck, qname(State1), OtherArgs), + notify_decorators( + basic_consume, [{consumer_tag, ConsumerTag}], State1), + reply(ok, run_message_queue(State1)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, - State = #q{exclusive_consumer = Holder}) -> + State = #q{active_consumers = AC, + exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of not_found -> @@ -1249,7 +1250,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C = #cr{consumer_count = Count, limiter = Limiter, blocked_consumers = Blocked} -> - emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), + AC1 = remove_consumer(ChPid, ConsumerTag, AC), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), Limiter1 = case Count of 1 -> rabbit_limiter:deactivate(Limiter); @@ -1259,14 +1260,13 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, update_ch_record(C#cr{consumer_count = Count - 1, limiter = Limiter2, blocked_consumers = Blocked1}), - State1 = State#q{ - exclusive_consumer = case Holder of - {ChPid, ConsumerTag} -> none; - _ -> Holder - end, - active_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.active_consumers)}, + Holder1 = case Holder of + {ChPid, ConsumerTag} -> none; + _ -> Holder + end, + State1 = State#q{active_consumers = AC1, + exclusive_consumer = Holder1}, + emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)), notify_decorators( basic_cancel, [{consumer_tag, ConsumerTag}], State1), case should_auto_delete(State1) of |