summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-05 08:53:23 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-05 08:53:23 +0000
commit608ba6ef42d63b95c744d5d744ab1e4181f6ce45 (patch)
tree98e329f8b8889b20e158723d77dd484d7d3fbf71
parentc8044c53b6a8eed5b685ff263b4ffbcba37a98c7 (diff)
downloadrabbitmq-server-608ba6ef42d63b95c744d5d744ab1e4181f6ce45.tar.gz
cosmetic
-rw-r--r--src/rabbit_amqqueue_process.erl101
1 files changed, 44 insertions, 57 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7719dfe7..24de9415 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -283,17 +283,16 @@ lookup_ch(ChPid) ->
ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
- undefined ->
- MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumer_count = 0,
- ch_pid = ChPid,
- monitor_ref = MonitorRef,
- acktags = sets:new(),
- is_limit_active = false,
- txn = none,
- unsent_message_count = 0},
- put(Key, C),
- C;
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ C = #cr{consumer_count = 0,
+ ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = sets:new(),
+ is_limit_active = false,
+ txn = none,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
C = #cr{} -> C
end.
@@ -319,18 +318,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-all_ch_record() ->
- [C || {{ch, _}, C} <- get()].
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
- BlockedOld = is_ch_blocked(OldCR),
- BlockedNew = is_ch_blocked(NewCR),
- if BlockedOld andalso not(BlockedNew) -> unblock;
- BlockedNew andalso not(BlockedOld) -> block;
- true -> ok
+ case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
+ {true, false} -> unblock;
+ {false, true} -> block;
+ {_, _} -> ok
end.
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
@@ -365,13 +362,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
BlockedConsumers};
- block ->
- {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
+ block -> {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
State2 = State1#q{
active_consumers = NewActiveConsumers,
@@ -396,8 +392,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{FunAcc, State}
end.
-deliver_from_queue_pred(IsEmpty, _State) ->
- not IsEmpty.
+deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
@@ -405,17 +400,16 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
- {CMs, GTC1} =
- lists:foldl(
- fun(Guid, {CMs, GTC0}) ->
- case dict:find(Guid, GTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
- {gb_trees_cons(ChPid, MsgSeqNo, CMs),
- dict:erase(Guid, GTC0)};
- _ ->
- {CMs, GTC0}
- end
- end, {gb_trees:empty(), GTC}, Guids),
+ {CMs, GTC1} = lists:foldl(
+ fun(Guid, {CMs, GTC0}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(Guid, GTC0)};
+ _ ->
+ {CMs, GTC0}
+ end
+ end, {gb_trees:empty(), GTC}, Guids),
gb_trees:map(fun(ChPid, MsgSeqNos) ->
rabbit_channel:confirm(ChPid, MsgSeqNos)
end, CMs),
@@ -480,17 +474,14 @@ attempt_delivery(#delivery{txn = none,
{Delivered, State1} =
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
{Delivered, NeedsConfirming, State1};
-attempt_delivery(#delivery{txn = Txn,
+attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- {NeedsConfirming,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}}) ->
+ {NeedsConfirming, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- {true,
- NeedsConfirming,
- State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
+ BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
+ {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
@@ -661,9 +652,8 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
Now = now_micros(),
BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) ->
- Now > Expiry
- end, BQS),
+ fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -814,8 +804,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery},
- _From, State) ->
+handle_call({deliver_immediately, Delivery}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -906,15 +895,13 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
case is_ch_blocked(C) of
true -> State1#q{
blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
+ add_consumer(ChPid, Consumer,
+ State1#q.blocked_consumers)};
false -> run_message_queue(
State1#q{
active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
+ add_consumer(ChPid, Consumer,
+ State1#q.active_consumers)})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck),