diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-16 02:36:59 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-16 02:36:59 +0100 |
commit | 4d8382acd9c5fa1d637c86ca65f557dd4d33472d (patch) | |
tree | 3599f5dff1c87b6595ab11d5e37a929210a17aef | |
parent | 4d5c11c51bbc8d456c26878ace2b06cdece40248 (diff) | |
download | rabbitmq-server-4d8382acd9c5fa1d637c86ca65f557dd4d33472d.tar.gz |
cosmetic changes and some minor refactoring
-rw-r--r-- | src/rabbit_channel.erl | 175 |
1 files changed, 82 insertions, 93 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9b5d7ec0..119a3d03 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,11 +35,10 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, - user, virtual_host, most_recently_declared_queue, + user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state, - queue_monitors}). + unconfirmed_qm, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -190,6 +189,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, + queue_monitors = dict:new(), consumer_mapping = dict:new(), blocking = gb_sets:new(), queue_consumers = dict:new(), @@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unconfirmed_qm = gb_trees:empty(), confirmed = [], capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost), - queue_monitors = dict:new()}, + trace_state = rabbit_trace:init(VHost)}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> emit_stats(State) end), @@ -327,9 +326,9 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), - erase_queue_stats(QPid), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = dict:erase(QPid, State3#ch.queue_monitors)}); @@ -555,32 +554,30 @@ process_confirms(MsgSeqNos, QPid, Nack, State) -> end, {[], State}, MsgSeqNos). remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, - {MXs, State = #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM}}, + {MXs, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}}, Nack) -> - State1 = - case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> demonitor_queue( - QPid, State#ch{unconfirmed_qm = - gb_trees:delete(QPid, UQM)}); - false -> State#ch{unconfirmed_qm = - gb_trees:update(QPid, MsgSeqNos1, UQM)} - end; - none -> - State - end, + State1 = case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), + case gb_sets:is_empty(MsgSeqNos1) of + true -> UQM1 = gb_trees:delete(QPid, UQM), + demonitor_queue( + QPid, State#ch{unconfirmed_qm = UQM1}); + false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), + State#ch{unconfirmed_qm = UQM1} + end; + none -> + State + end, Qs1 = gb_sets:del_element(QPid, Qs), %% If QPid somehow died initiating a nack, clear the message from %% internal data-structures. Also, cleanup empty entries. case (Nack orelse gb_sets:is_empty(Qs1)) of - true -> - {[{MsgSeqNo, XName} | MXs], - State1#ch{unconfirmed_mq = gb_trees:delete(MsgSeqNo, UMQ)}}; - false -> - {MXs, State1#ch{unconfirmed_mq = - gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ)}} + true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ), + {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}}; + false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), + {MXs, State1#ch{unconfirmed_mq = UMQ1}} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -771,7 +768,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, _, State = #ch{consumer_mapping = ConsumerMapping, - queue_consumers = QueueConsumers}) -> + queue_consumers = QCons}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -779,20 +776,18 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, return_ok(State, NoWait, OkMsg); {ok, Q = #amqqueue{pid = QPid}} -> ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), - QueueConsumers1 = - case dict:find(QPid, QueueConsumers) of - error -> QueueConsumers; - {ok, CTags} -> case gb_sets:size(CTags) of - 1 -> dict:erase(QPid, QueueConsumers); - _ -> dict:store(QPid, - gb_sets:delete(ConsumerTag, - CTags), - QueueConsumers) + QCons1 = + case dict:find(QPid, QCons) of + error -> QCons; + {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags), + case gb_sets:is_empty(CTags1) of + true -> dict:erase(QPid, QCons); + false -> dict:store(QPid, CTags1, QCons) end end, - State1 = State#ch{consumer_mapping = ConsumerMapping1, - queue_consumers = QueueConsumers1}, - State2 = demonitor_queue(Q, State1), + NewState = demonitor_queue( + Q, State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QCons1}), %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -807,10 +802,10 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, consumer_tag = ConsumerTag})) end) of ok -> - {noreply, State2}; + {noreply, NewState}; {error, not_found} -> %% Spec requires we ignore this situation. - return_ok(State2, NoWait, OkMsg) + return_ok(NewState, NoWait, OkMsg) end end; @@ -1116,15 +1111,13 @@ handle_method(#'channel.flow'{active = false}, _, State1 = State#ch{limiter = Limiter1}, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of - [] -> - {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> - State2 = lists:foldl(fun (QPid, State0) -> monitor_queue(QPid, State0) - end, - State1#ch{blocking = gb_sets:from_list(QPids)}, - QPids), - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State2} + [] -> {reply, #'channel.flow_ok'{active = false}, State1}; + QPids -> State2 = lists:foldl(fun monitor_queue/2, + State1#ch{blocking = + gb_sets:from_list(QPids)}, + QPids), + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, State2} end; handle_method(_MethodRecord, _Content, _State) -> @@ -1133,60 +1126,59 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -monitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) -> - case dict:is_key(QPid, QueueMonitors) of +monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> + case dict:is_key(QPid, QMons) of false -> case queue_monitor_needed(QPid, State) of false -> State; - true -> QueueMonitors1 = dict:store(QPid, - erlang:monitor(process, - QPid), - QueueMonitors), - State#ch{queue_monitors = QueueMonitors1} + true -> MRef = erlang:monitor(process, QPid), + State#ch{queue_monitors = + dict:store(QPid, MRef, QMons)} end; true -> State end. consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, - queue_consumers = QueueConsumers, + queue_consumers = QCons, capabilities = Capabilities}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), - QueueConsumers1 = - dict:update(QPid, - fun (CTags) -> gb_sets:insert(ConsumerTag, CTags) end, - gb_sets:singleton(ConsumerTag), - QueueConsumers), - monitor_queue(QPid, State#ch{queue_consumers = QueueConsumers1}); + QCons1 = dict:update(QPid, + fun (CTags) -> + gb_sets:insert(ConsumerTag, CTags) + end, + gb_sets:singleton(ConsumerTag), + QCons), + monitor_queue(QPid, State#ch{queue_consumers = QCons1}); _ -> State end. -demonitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) -> - case dict:find(QPid, QueueMonitors) of +demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> + case dict:find(QPid, QMons) of error -> State; {ok, MRef} -> case queue_monitor_needed(QPid, State) of false -> true = erlang:demonitor(MRef), State#ch{queue_monitors = - dict:erase(QPid, QueueMonitors)}; + dict:erase(QPid, QMons)}; true -> State end end. queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, - queue_consumers = QueueConsumers, + queue_consumers = QCons, blocking = Blocking, unconfirmed_qm = UQM}) -> - StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, - ConsumerMonitored = dict:is_key(QPid, QueueConsumers), - QueueBlocked = gb_sets:is_element(QPid, Blocking), - ConfirmMonitored = gb_trees:is_defined(QPid, UQM), + StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, + ConsumerMonitored = dict:is_key(QPid, QCons), + QueueBlocked = gb_sets:is_element(QPid, Blocking), + ConfirmMonitored = gb_trees:is_defined(QPid, UQM), StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> - MsgSeqNos = case gb_trees:lookup(QPid, UQM) of + MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); none -> [] end, @@ -1209,14 +1201,13 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, - queue_consumers = QueueConsumers, + queue_consumers = QCons, writer_pid = WriterPid}) -> - ConsumerTags = case dict:find(QPid, QueueConsumers) of + ConsumerTags = case dict:find(QPid, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, - QueueConsumers1 = dict:erase(QPid, QueueConsumers), - ConsumerMapping1 = gb_sets:fold(fun (CTag, CMap) -> dict:erase(CTag, CMap) end, + ConsumerMapping1 = gb_sets:fold(fun dict:erase/2, ConsumerMapping, ConsumerTags), [begin Cancel = #'basic.cancel'{consumer_tag = ConsumerTag, @@ -1224,7 +1215,7 @@ handle_consuming_queue_down(QPid, ok = rabbit_writer:send_command(WriterPid, Cancel) end || ConsumerTag <- gb_sets:to_list(ConsumerTags)], State#ch{consumer_mapping = ConsumerMapping1, - queue_consumers = QueueConsumers1}. + queue_consumers = dict:erase(QPid, QCons)}. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, @@ -1389,13 +1380,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - State1 = maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_unroutable, State), - record_confirm(MsgSeqNo, XName, State1); + record_confirm(MsgSeqNo, XName, + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State)); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - State1 = maybe_incr_stats([{XName, 1}], return_not_delivered, State), - record_confirm(MsgSeqNo, XName, State1); + record_confirm(MsgSeqNo, XName, + maybe_incr_stats([{XName, 1}], return_not_delivered, State)); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> @@ -1409,13 +1400,11 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> case gb_trees:lookup(QPid, UQM) of {value, MsgSeqNos} -> MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - State0#ch{unconfirmed_qm = - gb_trees:update(QPid, MsgSeqNos1, UQM)}; + UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), + State0#ch{unconfirmed_qm = UQM1}; none -> - State1 = - State0#ch{unconfirmed_qm = - gb_trees:insert(QPid, SingletonSet, UQM)}, - monitor_queue(QPid, State1) + UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), + monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1}) end end, State#ch{unconfirmed_mq = UMQ1}, QPids). @@ -1437,12 +1426,12 @@ send_nacks(_, State) -> maybe_complete_tx(State#ch{tx_status = failed}). send_confirms(State = #ch{tx_status = none, confirmed = C}) -> - C1 = lists:append(C), {MsgSeqNos, State1} = lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) -> {[MsgSeqNo | MSNs], - maybe_incr_stats([{ExchangeName, 1}], confirm, State0)} - end, {[], State}, C1), + maybe_incr_stats([{ExchangeName, 1}], confirm, + State0)} + end, {[], State}, lists:append(C)), send_confirms(MsgSeqNos, State1 #ch{confirmed = []}); send_confirms(State) -> maybe_complete_tx(State). |