summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-16 02:36:59 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-16 02:36:59 +0100
commit4d8382acd9c5fa1d637c86ca65f557dd4d33472d (patch)
tree3599f5dff1c87b6595ab11d5e37a929210a17aef
parent4d5c11c51bbc8d456c26878ace2b06cdece40248 (diff)
downloadrabbitmq-server-4d8382acd9c5fa1d637c86ca65f557dd4d33472d.tar.gz
cosmetic changes and some minor refactoring
-rw-r--r--src/rabbit_channel.erl175
1 files changed, 82 insertions, 93 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9b5d7ec0..119a3d03 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,11 +35,10 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter, tx_status, next_tag,
unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
- user, virtual_host, most_recently_declared_queue,
+ user, virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state,
- queue_monitors}).
+ unconfirmed_qm, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -190,6 +189,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_monitors = dict:new(),
consumer_mapping = dict:new(),
blocking = gb_sets:new(),
queue_consumers = dict:new(),
@@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unconfirmed_qm = gb_trees:empty(),
confirmed = [],
capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost),
- queue_monitors = dict:new()},
+ trace_state = rabbit_trace:init(VHost)},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> emit_stats(State) end),
@@ -327,9 +326,9 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
- erase_queue_stats(QPid),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
dict:erase(QPid, State3#ch.queue_monitors)});
@@ -555,32 +554,30 @@ process_confirms(MsgSeqNos, QPid, Nack, State) ->
end, {[], State}, MsgSeqNos).
remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
- {MXs, State = #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM}},
+ {MXs, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}},
Nack) ->
- State1 =
- case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> demonitor_queue(
- QPid, State#ch{unconfirmed_qm =
- gb_trees:delete(QPid, UQM)});
- false -> State#ch{unconfirmed_qm =
- gb_trees:update(QPid, MsgSeqNos1, UQM)}
- end;
- none ->
- State
- end,
+ State1 = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> UQM1 = gb_trees:delete(QPid, UQM),
+ demonitor_queue(
+ QPid, State#ch{unconfirmed_qm = UQM1});
+ false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
+ State#ch{unconfirmed_qm = UQM1}
+ end;
+ none ->
+ State
+ end,
Qs1 = gb_sets:del_element(QPid, Qs),
%% If QPid somehow died initiating a nack, clear the message from
%% internal data-structures. Also, cleanup empty entries.
case (Nack orelse gb_sets:is_empty(Qs1)) of
- true ->
- {[{MsgSeqNo, XName} | MXs],
- State1#ch{unconfirmed_mq = gb_trees:delete(MsgSeqNo, UMQ)}};
- false ->
- {MXs, State1#ch{unconfirmed_mq =
- gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ)}}
+ true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ),
+ {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}};
+ false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ),
+ {MXs, State1#ch{unconfirmed_mq = UMQ1}}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -771,7 +768,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
_, State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QueueConsumers}) ->
+ queue_consumers = QCons}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -779,20 +776,18 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
return_ok(State, NoWait, OkMsg);
{ok, Q = #amqqueue{pid = QPid}} ->
ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
- QueueConsumers1 =
- case dict:find(QPid, QueueConsumers) of
- error -> QueueConsumers;
- {ok, CTags} -> case gb_sets:size(CTags) of
- 1 -> dict:erase(QPid, QueueConsumers);
- _ -> dict:store(QPid,
- gb_sets:delete(ConsumerTag,
- CTags),
- QueueConsumers)
+ QCons1 =
+ case dict:find(QPid, QCons) of
+ error -> QCons;
+ {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
+ case gb_sets:is_empty(CTags1) of
+ true -> dict:erase(QPid, QCons);
+ false -> dict:store(QPid, CTags1, QCons)
end
end,
- State1 = State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QueueConsumers1},
- State2 = demonitor_queue(Q, State1),
+ NewState = demonitor_queue(
+ Q, State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1}),
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -807,10 +802,10 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
consumer_tag = ConsumerTag}))
end) of
ok ->
- {noreply, State2};
+ {noreply, NewState};
{error, not_found} ->
%% Spec requires we ignore this situation.
- return_ok(State2, NoWait, OkMsg)
+ return_ok(NewState, NoWait, OkMsg)
end
end;
@@ -1116,15 +1111,13 @@ handle_method(#'channel.flow'{active = false}, _,
State1 = State#ch{limiter = Limiter1},
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
- [] ->
- {reply, #'channel.flow_ok'{active = false}, State1};
- QPids ->
- State2 = lists:foldl(fun (QPid, State0) -> monitor_queue(QPid, State0)
- end,
- State1#ch{blocking = gb_sets:from_list(QPids)},
- QPids),
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State2}
+ [] -> {reply, #'channel.flow_ok'{active = false}, State1};
+ QPids -> State2 = lists:foldl(fun monitor_queue/2,
+ State1#ch{blocking =
+ gb_sets:from_list(QPids)},
+ QPids),
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, State2}
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1133,60 +1126,59 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
-monitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) ->
- case dict:is_key(QPid, QueueMonitors) of
+monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
+ case dict:is_key(QPid, QMons) of
false -> case queue_monitor_needed(QPid, State) of
false -> State;
- true -> QueueMonitors1 = dict:store(QPid,
- erlang:monitor(process,
- QPid),
- QueueMonitors),
- State#ch{queue_monitors = QueueMonitors1}
+ true -> MRef = erlang:monitor(process, QPid),
+ State#ch{queue_monitors =
+ dict:store(QPid, MRef, QMons)}
end;
true -> State
end.
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QueueConsumers,
+ queue_consumers = QCons,
capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
#amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
- QueueConsumers1 =
- dict:update(QPid,
- fun (CTags) -> gb_sets:insert(ConsumerTag, CTags) end,
- gb_sets:singleton(ConsumerTag),
- QueueConsumers),
- monitor_queue(QPid, State#ch{queue_consumers = QueueConsumers1});
+ QCons1 = dict:update(QPid,
+ fun (CTags) ->
+ gb_sets:insert(ConsumerTag, CTags)
+ end,
+ gb_sets:singleton(ConsumerTag),
+ QCons),
+ monitor_queue(QPid, State#ch{queue_consumers = QCons1});
_ ->
State
end.
-demonitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) ->
- case dict:find(QPid, QueueMonitors) of
+demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
+ case dict:find(QPid, QMons) of
error -> State;
{ok, MRef} -> case queue_monitor_needed(QPid, State) of
false -> true = erlang:demonitor(MRef),
State#ch{queue_monitors =
- dict:erase(QPid, QueueMonitors)};
+ dict:erase(QPid, QMons)};
true -> State
end
end.
queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
- queue_consumers = QueueConsumers,
+ queue_consumers = QCons,
blocking = Blocking,
unconfirmed_qm = UQM}) ->
- StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
- ConsumerMonitored = dict:is_key(QPid, QueueConsumers),
- QueueBlocked = gb_sets:is_element(QPid, Blocking),
- ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
+ StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ ConsumerMonitored = dict:is_key(QPid, QCons),
+ QueueBlocked = gb_sets:is_element(QPid, Blocking),
+ ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
+ MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
none -> []
end,
@@ -1209,14 +1201,13 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QueueConsumers,
+ queue_consumers = QCons,
writer_pid = WriterPid}) ->
- ConsumerTags = case dict:find(QPid, QueueConsumers) of
+ ConsumerTags = case dict:find(QPid, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
- QueueConsumers1 = dict:erase(QPid, QueueConsumers),
- ConsumerMapping1 = gb_sets:fold(fun (CTag, CMap) -> dict:erase(CTag, CMap) end,
+ ConsumerMapping1 = gb_sets:fold(fun dict:erase/2,
ConsumerMapping, ConsumerTags),
[begin
Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
@@ -1224,7 +1215,7 @@ handle_consuming_queue_down(QPid,
ok = rabbit_writer:send_command(WriterPid, Cancel)
end || ConsumerTag <- gb_sets:to_list(ConsumerTags)],
State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QueueConsumers1}.
+ queue_consumers = dict:erase(QPid, QCons)}.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
@@ -1389,13 +1380,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- State1 = maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State1);
+ record_confirm(MsgSeqNo, XName,
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State));
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- State1 = maybe_incr_stats([{XName, 1}], return_not_delivered, State),
- record_confirm(MsgSeqNo, XName, State1);
+ record_confirm(MsgSeqNo, XName,
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State));
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
@@ -1409,13 +1400,11 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
case gb_trees:lookup(QPid, UQM) of
{value, MsgSeqNos} ->
MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- State0#ch{unconfirmed_qm =
- gb_trees:update(QPid, MsgSeqNos1, UQM)};
+ UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
+ State0#ch{unconfirmed_qm = UQM1};
none ->
- State1 =
- State0#ch{unconfirmed_qm =
- gb_trees:insert(QPid, SingletonSet, UQM)},
- monitor_queue(QPid, State1)
+ UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
+ monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
@@ -1437,12 +1426,12 @@ send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx_status = failed}).
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- C1 = lists:append(C),
{MsgSeqNos, State1} =
lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
{[MsgSeqNo | MSNs],
- maybe_incr_stats([{ExchangeName, 1}], confirm, State0)}
- end, {[], State}, C1),
+ maybe_incr_stats([{ExchangeName, 1}], confirm,
+ State0)}
+ end, {[], State}, lists:append(C)),
send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).