diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-02-10 09:36:00 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-02-10 09:36:00 +0000 |
commit | 01ea1730e76befd5d85c1b90ba23600b3ddb327d (patch) | |
tree | 8dec68ace199a101cd6b6a3e4d71bd8d48eb82e5 | |
parent | dd1225bb279d07d57bde0415b26666f59497658c (diff) | |
download | rabbitmq-server-01ea1730e76befd5d85c1b90ba23600b3ddb327d.tar.gz |
minor cosmetic changes and refactoring in amqqueue_process
these bring it more in line with the bug21673 branch
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 52 |
1 files changed, 25 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 29d428c7..399e9ee5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -176,7 +176,6 @@ deliver_immediately(Message, Delivered, active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers, next_msg_id = NextId}) -> - ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -254,8 +253,8 @@ deliver_or_enqueue(Txn, ChPid, Message, State) -> end. deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), - State). + run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)), + State). add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -289,7 +288,7 @@ possibly_unblock(State, ChPid, Update) -> move_consumers(ChPid, State#q.blocked_consumers, State#q.active_consumers), - run_poke_burst( + run_message_queue( State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}) end @@ -349,19 +348,19 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -run_poke_burst(State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(MessageBuffer, State). +run_message_queue(State = #q{message_buffer = MessageBuffer}) -> + run_message_queue(MessageBuffer, State). -run_poke_burst(MessageBuffer, State) -> +run_message_queue(MessageBuffer, State) -> case queue:out(MessageBuffer) of {{value, {Message, Delivered}}, BufferTail} -> case deliver_immediately(Message, Delivered, State) of {offered, true, NewState} -> persist_delivery(qname(State), Message, Delivered), - run_poke_burst(BufferTail, NewState); + run_message_queue(BufferTail, NewState); {offered, false, NewState} -> persist_auto_ack(qname(State), Message), - run_poke_burst(BufferTail, NewState); + run_message_queue(BufferTail, NewState); {not_offered, NewState} -> NewState#q{message_buffer = MessageBuffer} end; @@ -654,15 +653,14 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ack_required = not(NoAck)}, store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), - if ConsumerCount == 0 -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> - ok + case ConsumerCount of + 0 -> ok = rabbit_limiter:register(LimiterPid, self()); + _ -> ok end, - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, + ExclusiveConsumer = case ExclusiveConsume of + true -> {ChPid, ConsumerTag}; + false -> ExistingHolder + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), @@ -673,7 +671,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, add_consumer( ChPid, Consumer, State1#q.blocked_consumers)}; - false -> run_poke_burst( + false -> run_message_queue( State1#q{ active_consumers = add_consumer( @@ -692,10 +690,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, reply(ok, State); C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), - if ConsumerCount == 1 -> - ok = rabbit_limiter:unregister(LimiterPid, self()); - true -> - ok + case ConsumerCount of + 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()); + _ -> ok end, ok = maybe_send_reply(ChPid, OkMsg), NewState = @@ -717,8 +714,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, active_consumers = ActiveConsumers}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)}, - State); + Length = queue:len(MessageBuffer), + reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> @@ -738,8 +735,8 @@ handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> reply({ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}); -handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, - exclusive_consumer = Holder}) -> +handle_call({claim_queue, ReaderPid}, _From, + State = #q{owner = Owner, exclusive_consumer = Holder}) -> case Owner of none -> case check_exclusive_access(Holder, true, State) of @@ -752,7 +749,8 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, %% pid... reply(locked, State); ok -> - reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) + MonitorRef = erlang:monitor(process, ReaderPid), + reply(ok, State#q{owner = {ReaderPid, MonitorRef}}) end; {ReaderPid, _MonitorRef} -> reply(ok, State); |