summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-10 09:36:00 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-10 09:36:00 +0000
commit01ea1730e76befd5d85c1b90ba23600b3ddb327d (patch)
tree8dec68ace199a101cd6b6a3e4d71bd8d48eb82e5
parentdd1225bb279d07d57bde0415b26666f59497658c (diff)
downloadrabbitmq-server-01ea1730e76befd5d85c1b90ba23600b3ddb327d.tar.gz
minor cosmetic changes and refactoring in amqqueue_process
these bring it more in line with the bug21673 branch
-rw-r--r--src/rabbit_amqqueue_process.erl52
1 files changed, 25 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 29d428c7..399e9ee5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -176,7 +176,6 @@ deliver_immediately(Message, Delivered,
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers,
next_msg_id = NextId}) ->
- ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -254,8 +253,8 @@ deliver_or_enqueue(Txn, ChPid, Message, State) ->
end.
deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
- State).
+ run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)),
+ State).
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -289,7 +288,7 @@ possibly_unblock(State, ChPid, Update) ->
move_consumers(ChPid,
State#q.blocked_consumers,
State#q.active_consumers),
- run_poke_burst(
+ run_message_queue(
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers})
end
@@ -349,19 +348,19 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-run_poke_burst(State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(MessageBuffer, State).
+run_message_queue(State = #q{message_buffer = MessageBuffer}) ->
+ run_message_queue(MessageBuffer, State).
-run_poke_burst(MessageBuffer, State) ->
+run_message_queue(MessageBuffer, State) ->
case queue:out(MessageBuffer) of
{{value, {Message, Delivered}}, BufferTail} ->
case deliver_immediately(Message, Delivered, State) of
{offered, true, NewState} ->
persist_delivery(qname(State), Message, Delivered),
- run_poke_burst(BufferTail, NewState);
+ run_message_queue(BufferTail, NewState);
{offered, false, NewState} ->
persist_auto_ack(qname(State), Message),
- run_poke_burst(BufferTail, NewState);
+ run_message_queue(BufferTail, NewState);
{not_offered, NewState} ->
NewState#q{message_buffer = MessageBuffer}
end;
@@ -654,15 +653,14 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ack_required = not(NoAck)},
store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
- if ConsumerCount == 0 ->
- ok = rabbit_limiter:register(LimiterPid, self());
- true ->
- ok
+ case ConsumerCount of
+ 0 -> ok = rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
end,
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
+ ExclusiveConsumer = case ExclusiveConsume of
+ true -> {ChPid, ConsumerTag};
+ false -> ExistingHolder
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
@@ -673,7 +671,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
add_consumer(
ChPid, Consumer,
State1#q.blocked_consumers)};
- false -> run_poke_burst(
+ false -> run_message_queue(
State1#q{
active_consumers =
add_consumer(
@@ -692,10 +690,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
reply(ok, State);
C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
- if ConsumerCount == 1 ->
- ok = rabbit_limiter:unregister(LimiterPid, self());
- true ->
- ok
+ case ConsumerCount of
+ 1 -> ok = rabbit_limiter:unregister(LimiterPid, self());
+ _ -> ok
end,
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
@@ -717,8 +714,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
active_consumers = ActiveConsumers}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
- State);
+ Length = queue:len(MessageBuffer),
+ reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
@@ -738,8 +735,8 @@ handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
reply({ok, queue:len(MessageBuffer)},
State#q{message_buffer = queue:new()});
-handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
- exclusive_consumer = Holder}) ->
+handle_call({claim_queue, ReaderPid}, _From,
+ State = #q{owner = Owner, exclusive_consumer = Holder}) ->
case Owner of
none ->
case check_exclusive_access(Holder, true, State) of
@@ -752,7 +749,8 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
%% pid...
reply(locked, State);
ok ->
- reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
+ MonitorRef = erlang:monitor(process, ReaderPid),
+ reply(ok, State#q{owner = {ReaderPid, MonitorRef}})
end;
{ReaderPid, _MonitorRef} ->
reply(ok, State);