summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-16 17:49:38 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-16 17:49:38 +0100
commitf86c0afd2fc0dbb12bf2b1b7cea345440e13cb1d (patch)
tree266dcffb8e143fba294d89a5f9c71bf43b6695d7
parent003197e2abef80a31c8008fe5419b2572d4c0037 (diff)
parenta2ccf055a08811f7f37b7456f6bc8b598867baed (diff)
downloadrabbitmq-server-f86c0afd2fc0dbb12bf2b1b7cea345440e13cb1d.tar.gz
merge bug24428 into default
-rw-r--r--docs/examples-to-end.xsl7
-rw-r--r--src/file_handle_cache.erl42
-rw-r--r--src/rabbit_channel.erl342
3 files changed, 214 insertions, 177 deletions
diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl
index a0a74178..4db1d5c4 100644
--- a/docs/examples-to-end.xsl
+++ b/docs/examples-to-end.xsl
@@ -2,7 +2,10 @@
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
version='1.0'>
-<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" />
+<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN"
+ doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd"
+ indent="yes"
+/>
<!-- Don't copy examples through in place -->
<xsl:template match="*[@role='example-prefix']"/>
@@ -27,7 +30,7 @@
<varlistentry>
<term><command><xsl:copy-of select="text()"/></command></term>
<listitem>
- <xsl:copy-of select="following-sibling::para[@role='example']"/>
+ <xsl:copy-of select="following-sibling::para[@role='example' and preceding-sibling::screen[1] = current()]"/>
</listitem>
</varlistentry>
</xsl:for-each>
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 776ac43a..e14dfe22 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -159,7 +159,8 @@
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
--define(CLIENT_ETS_TABLE, ?MODULE).
+-define(CLIENT_ETS_TABLE, file_handle_cache_client).
+-define(ELDERS_ETS_TABLE, file_handle_cache_elders).
%%----------------------------------------------------------------------------
@@ -802,7 +803,8 @@ init([]) ->
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
[Limit, ObtainLimit]),
Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
- {ok, #fhc_state { elders = dict:new(),
+ Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]),
+ {ok, #fhc_state { elders = Elders,
limit = Limit,
open_count = 0,
open_pending = pending_new(),
@@ -818,28 +820,27 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
elders = Elders,
clients = Clients })
when EldestUnusedSince =/= undefined ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ true = ets:insert(Elders, {Pid, EldestUnusedSince}),
Item = #pending { kind = open,
pid = Pid,
requested = Requested,
from = From },
ok = track_client(Pid, Clients),
- State1 = State #fhc_state { elders = Elders1 },
- case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of
+ case needs_reduce(State #fhc_state { open_count = Count + Requested }) of
true -> case ets:lookup(Clients, Pid) of
[#cstate { opened = 0 }] ->
true = ets:update_element(
Clients, Pid, {#cstate.blocked, true}),
{noreply,
- reduce(State1 #fhc_state {
+ reduce(State #fhc_state {
open_pending = pending_in(Item, Pending) })};
[#cstate { opened = Opened }] ->
true = ets:update_element(
Clients, Pid,
{#cstate.pending_closes, Opened}),
- {reply, close, State1}
+ {reply, close, State}
end;
- false -> {noreply, run_pending_item(Item, State1)}
+ false -> {noreply, run_pending_item(Item, State)}
end;
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
@@ -887,21 +888,20 @@ handle_cast({register_callback, Pid, MFA},
handle_cast({update, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders })
when EldestUnusedSince =/= undefined ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ true = ets:insert(Elders, {Pid, EldestUnusedSince}),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, State #fhc_state { elders = Elders1 }};
+ {noreply, State};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
- Elders1 = case EldestUnusedSince of
- undefined -> dict:erase(Pid, Elders);
- _ -> dict:store(Pid, EldestUnusedSince, Elders)
- end,
+ true = case EldestUnusedSince of
+ undefined -> ets:delete(Elders, Pid);
+ _ -> ets:insert(Elders, {Pid, EldestUnusedSince})
+ end,
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
{noreply, adjust_alarm(State, process_pending(
- update_counts(open, Pid, -1,
- State #fhc_state { elders = Elders1 })))};
+ update_counts(open, Pid, -1, State)))};
handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
@@ -922,6 +922,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
[#cstate { opened = Opened, obtained = Obtained }] =
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
+ true = ets:delete(Elders, Pid),
FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
{noreply, adjust_alarm(
State,
@@ -930,11 +931,12 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
open_count = OpenCount - Opened,
open_pending = filter_pending(FilterFun, OpenPending),
obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending),
- elders = dict:erase(Pid, Elders) }))}.
+ obtain_pending = filter_pending(FilterFun, ObtainPending) }))}.
-terminate(_Reason, State = #fhc_state { clients = Clients }) ->
+terminate(_Reason, State = #fhc_state { clients = Clients,
+ elders = Elders }) ->
ets:delete(Clients),
+ ets:delete(Elders),
State.
code_change(_OldVsn, State, _Extra) ->
@@ -1090,7 +1092,7 @@ reduce(State = #fhc_state { open_pending = OpenPending,
timer_ref = TRef }) ->
Now = now(),
{CStates, Sum, ClientCount} =
- dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
+ ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
[#cstate { pending_closes = PendingCloses,
opened = Opened,
blocked = Blocked } = CState] =
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dfe84644..d2f55277 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,8 +35,8 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter, tx_status, next_tag,
unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
- user, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
+ user, virtual_host, most_recently_declared_queue, queue_monitors,
+ consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -189,9 +189,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_monitors = dict:new(),
consumer_mapping = dict:new(),
- blocking = dict:new(),
- consumer_monitors = dict:new(),
+ blocking = sets:new(),
+ queue_consumers = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -275,7 +276,7 @@ handle_cast(terminate, State) ->
handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- noreply(monitor_consumer(ConsumerTag, State));
+ noreply(consumer_monitor(ConsumerTag, State));
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
@@ -299,13 +300,13 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State),
- maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
+ State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
+ noreply(State3#ch{next_tag = DeliveryTag + 1});
handle_cast(force_event_refresh, State) ->
@@ -323,15 +324,13 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
noreply([ensure_stats_timer],
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
-handle_info({'DOWN', MRef, process, QPid, Reason},
- State = #ch{consumer_monitors = ConsumerMonitors}) ->
- noreply(
- case dict:find(MRef, ConsumerMonitors) of
- error ->
- handle_publishing_queue_down(QPid, Reason, State);
- {ok, ConsumerTag} ->
- handle_consuming_queue_down(MRef, ConsumerTag, State)
- end);
+handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
+ State1 = handle_publishing_queue_down(QPid, Reason, State),
+ State2 = queue_blocked(QPid, State1),
+ State3 = handle_consuming_queue_down(QPid, State2),
+ erase_queue_stats(QPid),
+ noreply(State3#ch{queue_monitors =
+ dict:erase(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -516,17 +515,16 @@ check_name(_Kind, NameBin) ->
NameBin.
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
- case dict:find(QPid, Blocking) of
- error -> State;
- {ok, MRef} -> true = erlang:demonitor(MRef),
- Blocking1 = dict:erase(QPid, Blocking),
- ok = case dict:size(Blocking1) of
- 0 -> rabbit_writer:send_command(
- State#ch.writer_pid,
- #'channel.flow_ok'{active = false});
- _ -> ok
- end,
- State#ch{blocking = Blocking1}
+ case sets:is_element(QPid, Blocking) of
+ false -> State;
+ true -> Blocking1 = sets:del_element(QPid, Blocking),
+ ok = case sets:size(Blocking1) of
+ 0 -> rabbit_writer:send_command(
+ State#ch.writer_pid,
+ #'channel.flow_ok'{active = false});
+ _ -> ok
+ end,
+ demonitor_queue(QPid, State#ch{blocking = Blocking1})
end.
record_confirm(undefined, _, State) ->
@@ -545,38 +543,41 @@ confirm(MsgSeqNos, QPid, State) ->
{MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
record_confirms(MXs, State1).
-process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}) ->
- {MXs, UMQ1, UQM1} =
- lists:foldl(
- fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
- none -> Acc
- end
- end, {[], UMQ, UQM}, MsgSeqNos),
- {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) ->
- UQM1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> gb_trees:delete(QPid, UQM);
- false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
- end;
- none ->
- UQM
- end,
+process_confirms(MsgSeqNos, QPid, Nack, State) ->
+ lists:foldl(
+ fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
+ Acc, Nack);
+ none -> Acc
+ end
+ end, {[], State}, MsgSeqNos).
+
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
+ {MXs, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}},
+ Nack) ->
+ State1 = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> UQM1 = gb_trees:delete(QPid, UQM),
+ demonitor_queue(
+ QPid, State#ch{unconfirmed_qm = UQM1});
+ false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
+ State#ch{unconfirmed_qm = UQM1}
+ end;
+ none ->
+ State
+ end,
Qs1 = gb_sets:del_element(QPid, Qs),
%% If QPid somehow died initiating a nack, clear the message from
%% internal data-structures. Also, cleanup empty entries.
case (Nack orelse gb_sets:is_empty(Qs1)) of
- true ->
- {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
- false ->
- {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
+ true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ),
+ {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}};
+ false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ),
+ {MXs, State1#ch{unconfirmed_mq = UMQ1}}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -693,11 +694,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State),
- maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
+ State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
@@ -707,7 +708,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State1#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State3#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -746,12 +747,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
{ok, Q} ->
State1 = State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- {Q, undefined},
+ dict:store(ActualConsumerTag, Q,
ConsumerMapping)},
{noreply,
case NoWait of
- true -> monitor_consumer(ActualConsumerTag, State1);
+ true -> consumer_monitor(ActualConsumerTag, State1);
false -> State1
end};
{{error, exclusive_consume_unavailable}, _Q} ->
@@ -768,22 +768,26 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
_, State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors}) ->
+ queue_consumers = QCons}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, {Q, MRef}} ->
- ConsumerMonitors1 =
- case MRef of
- undefined -> ConsumerMonitors;
- _ -> true = erlang:demonitor(MRef),
- dict:erase(MRef, ConsumerMonitors)
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
+ QCons1 =
+ case dict:find(QPid, QCons) of
+ error -> QCons;
+ {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
+ case gb_sets:is_empty(CTags1) of
+ true -> dict:erase(QPid, QCons);
+ false -> dict:store(QPid, CTags1, QCons)
+ end
end,
- NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
- ConsumerMapping),
- consumer_monitors = ConsumerMonitors1},
+ NewState = demonitor_queue(
+ Q, State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1}),
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -1108,10 +1112,12 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
- QPid <- QPids],
+ QPids -> State2 = lists:foldl(fun monitor_queue/2,
+ State1#ch{blocking =
+ sets:from_list(QPids)},
+ QPids),
ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State1#ch{blocking = dict:from_list(Queues)}}
+ {noreply, State2}
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1120,23 +1126,51 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
-monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- capabilities = Capabilities}) ->
+consumer_monitor(ConsumerTag,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QCons,
+ capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
- {#amqqueue{pid = QPid} = Q, undefined} =
- dict:fetch(ConsumerTag, ConsumerMapping),
- MRef = erlang:monitor(process, QPid),
- State#ch{consumer_mapping =
- dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
- consumer_monitors =
- dict:store(MRef, ConsumerTag, ConsumerMonitors)};
+ #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
+ QCons1 = dict:update(QPid,
+ fun (CTags) ->
+ gb_sets:insert(ConsumerTag, CTags)
+ end,
+ gb_sets:singleton(ConsumerTag),
+ QCons),
+ monitor_queue(QPid, State#ch{queue_consumers = QCons1});
_ ->
State
end.
+monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
+ case (not dict:is_key(QPid, QMons) andalso
+ queue_monitor_needed(QPid, State)) of
+ true -> MRef = erlang:monitor(process, QPid),
+ State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
+ false -> State
+ end.
+
+demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
+ case (dict:is_key(QPid, QMons) andalso
+ not queue_monitor_needed(QPid, State)) of
+ true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
+ State#ch{queue_monitors = dict:erase(QPid, QMons)};
+ false -> State
+ end.
+
+queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
+ queue_consumers = QCons,
+ blocking = Blocking,
+ unconfirmed_qm = UQM}) ->
+ StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ ConsumerMonitored = dict:is_key(QPid, QCons),
+ QueueBlocked = sets:is_element(QPid, Blocking),
+ ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
+ StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
+
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1157,21 +1191,25 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
{true, fun send_nacks/2}
end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
- erase_queue_stats(QPid),
- State3 = SendFun(MXs, State2),
- queue_blocked(QPid, State3).
-
-handle_consuming_queue_down(MRef, ConsumerTag,
- State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- writer_pid = WriterPid}) ->
- ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
- ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
- Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
- nowait = true},
- ok = rabbit_writer:send_command(WriterPid, Cancel),
+ SendFun(MXs, State2).
+
+handle_consuming_queue_down(QPid,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QCons,
+ writer_pid = WriterPid}) ->
+ ConsumerTags = case dict:find(QPid, QCons) of
+ error -> gb_sets:new();
+ {ok, CTags} -> CTags
+ end,
+ ConsumerMapping1 =
+ gb_sets:fold(fun (CTag, CMap) ->
+ Cancel = #'basic.cancel'{consumer_tag = CTag,
+ nowait = true},
+ ok = rabbit_writer:send_command(WriterPid, Cancel),
+ dict:erase(CTag, CMap)
+ end, ConsumerMapping, ConsumerTags),
State#ch{consumer_mapping = ConsumerMapping1,
- consumer_monitors = ConsumerMonitors1}.
+ queue_consumers = dict:erase(QPid, QCons)}.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
@@ -1271,9 +1309,8 @@ ack(Acked, State) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
[{QPid, length(MsgIds)} | L]
end, [], Acked),
- maybe_incr_stats(QIncs, ack, State),
ok = notify_limiter(State#ch.limiter, Acked),
- State.
+ maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
uncommitted_ack_q = queue:new()}.
@@ -1307,8 +1344,7 @@ limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
consumer_queues(Consumers) ->
lists:usort([QPid ||
- {_Key, {#amqqueue{pid = QPid}, _MRef}}
- <- dict:to_list(Consumers)]).
+ {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
@@ -1334,38 +1370,37 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
XName, MsgSeqNo, Message, State),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1),
- State1.
+ QPid <- DeliveredQPids]], publish, State1).
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State);
+ record_confirm(MsgSeqNo, XName,
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State));
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{XName, 1}], return_not_delivered, State),
- record_confirm(MsgSeqNo, XName, State);
+ record_confirm(MsgSeqNo, XName,
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State));
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ #ch{unconfirmed_mq = UMQ} = State,
UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
SingletonSet = gb_sets:singleton(MsgSeqNo),
- UQM1 = lists:foldl(
- fun (QPid, UQM2) ->
- maybe_monitor(QPid),
- case gb_trees:lookup(QPid, UQM2) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- gb_trees:update(QPid, MsgSeqNos1, UQM2);
- none ->
- gb_trees:insert(QPid, SingletonSet, UQM2)
- end
- end, UQM, QPids),
- State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
+ lists:foldl(
+ fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) ->
+ case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
+ UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
+ State0#ch{unconfirmed_qm = UQM1};
+ none ->
+ UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
+ monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ end
+ end, State#ch{unconfirmed_mq = UMQ1}, QPids).
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1385,11 +1420,13 @@ send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx_status = failed}).
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- C1 = lists:append(C),
- MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
- MsgSeqNo
- end || {MsgSeqNo, ExchangeName} <- C1 ],
- send_confirms(MsgSeqNos, State #ch{confirmed = []});
+ {MsgSeqNos, State1} =
+ lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
+ {[MsgSeqNo | MSNs],
+ maybe_incr_stats([{ExchangeName, 1}], confirm,
+ State0)}
+ end, {[], State}, lists:append(C)),
+ send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1469,30 +1506,26 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, _) ->
- ok.
+maybe_incr_redeliver_stats(_, _, State) ->
+ State.
-maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
+maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) ->
case rabbit_event:stats_level(StatsTimer) of
- fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
- _ -> ok
+ fine -> lists:foldl(fun ({QX, Inc}, State0) ->
+ incr_stats(QX, Inc, Measure, State0)
+ end, State, QXIncs);
+ _ -> State
end.
-incr_stats({QPid, _} = QX, Inc, Measure) ->
- maybe_monitor(QPid),
- update_measures(queue_exchange_stats, QX, Inc, Measure);
-incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
- maybe_monitor(QPid),
- update_measures(queue_stats, QPid, Inc, Measure);
-incr_stats(X, Inc, Measure) ->
- update_measures(exchange_stats, X, Inc, Measure).
-
-maybe_monitor(QPid) ->
- case get({monitoring, QPid}) of
- undefined -> erlang:monitor(process, QPid),
- put({monitoring, QPid}, true);
- _ -> ok
- end.
+incr_stats({QPid, _} = QX, Inc, Measure, State) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure),
+ monitor_queue(QPid, State);
+incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure),
+ monitor_queue(QPid, State);
+incr_stats(X, Inc, Measure, State) ->
+ update_measures(exchange_stats, X, Inc, Measure),
+ State.
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
@@ -1528,7 +1561,6 @@ emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
end.
erase_queue_stats(QPid) ->
- erase({monitoring, QPid}),
erase({queue_stats, QPid}),
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].