summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-21 13:05:57 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-21 13:05:57 +0000
commit19a654b6430350613e16652050fd4d18c16f13c6 (patch)
treee7b42d1c1b75838834fdd29cbc2baecdb231dd01
parent15e93f4bd518848965fe22556200402270a1112d (diff)
parent0e40b583638ec14811852aa3281df677c4b7ccdb (diff)
downloadrabbitmq-server-19a654b6430350613e16652050fd4d18c16f13c6.tar.gz
merge from default
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--src/rabbit_channel.erl86
2 files changed, 62 insertions, 34 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 2152cab3..93c85617 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1191,6 +1191,16 @@
messages to the channel's consumers.
</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>confirm</term>
+ <listitem><para>True if the channel is in confirm mode, false otherwise.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>unconfirmed</term>
+ <listitem><para>Number of published messages not yet
+ confirmed. On channels not in confirm mode, this
+ remains 0.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b92206ad..40337843 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -41,8 +41,10 @@
-define(STATISTICS_KEYS,
[pid,
transactional,
+ confirm,
consumer_count,
messages_unacknowledged,
+ unconfirmed,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -280,12 +282,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
%% TODO: this does a complete scan and partial rebuild of the
%% tree, which is quite efficient. To do better we'd need to
%% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MsgSeqNos, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}),
+ {MXs, UC1} = remove_queue_unconfirmed(
+ gb_trees:next(gb_trees:iterator(UC)), QPid,
+ {[], UC}, State),
erase_queue_stats(QPid),
- noreply(queue_blocked(QPid, record_confirms(MsgSeqNos,
- State#ch{unconfirmed = UC1}))).
+ noreply(
+ queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -471,38 +473,42 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc) ->
+remove_queue_unconfirmed(none, _QPid, Acc, _State) ->
Acc;
-remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) ->
+remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) ->
remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, Qs, Acc)).
+ remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State),
+ State).
-record_confirm(undefined, State) -> State;
-record_confirm(MsgSeqNo, State) -> record_confirms([MsgSeqNo], State).
+record_confirm(undefined, _, State) ->
+ State;
+record_confirm(MsgSeqNo, XName, State) ->
+ record_confirms([{MsgSeqNo, XName}], State).
record_confirms([], State) ->
State;
-record_confirms(MsgSeqNos, State = #ch{confirmed = C}) ->
- State#ch{confirmed = [MsgSeqNos | C]}.
+record_confirms(MXs, State = #ch{confirmed = C}) ->
+ State#ch{confirmed = [MXs | C]}.
confirm([], _QPid, State) ->
State;
confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {DoneMessages, UC2} =
+ {MXs, UC1} =
lists:foldl(
fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UC0) of
none -> Acc;
- {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc)
+ {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
end
end, {[], UC}, MsgSeqNos),
- record_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
-remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) ->
+remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
Qs1 = sets:del_element(QPid, Qs),
+ maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
case sets:size(Qs1) of
- 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)}
+ 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
+ _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -555,7 +561,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
@@ -1222,20 +1228,20 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_route),
- record_confirm(MsgSeqNo, State);
-process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- record_confirm(MsgSeqNo, State);
-process_routing_result(routed, [], MsgSeqNo, _, State) ->
- record_confirm(MsgSeqNo, State);
-process_routing_result(routed, _, undefined, _, State) ->
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(routed, _, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
#ch{unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC),
+ UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
State#ch{unconfirmed = UC1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
@@ -1243,9 +1249,18 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-send_confirms(State = #ch{confirmed = C}) ->
- send_confirms(lists:append(C), State #ch{confirmed = []}).
-
+send_confirms(State = #ch{confirmed = C, stats_timer = StatsTimer}) ->
+ C1 = lists:append(C),
+ MsgSeqNos = case rabbit_event:stats_level(StatsTimer) of
+ fine ->
+ [ begin maybe_incr_stats([{ExchangeName, 1}],
+ confirm, State),
+ MsgSeqNo
+ end || {MsgSeqNo, ExchangeName} <- C1];
+ _ ->
+ [MsgSeqNo || {MsgSeqNo, _} <- C1]
+ end,
+ send_confirms(MsgSeqNos, State #ch{confirmed = []}).
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
@@ -1255,7 +1270,7 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SCs = lists:usort(Cs),
CutOff = case gb_trees:is_empty(UC) of
true -> lists:last(SCs) + 1;
- false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo
+ false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
case Ms of
@@ -1283,8 +1298,11 @@ i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
+i(unconfirmed, #ch{unconfirmed = UC}) ->
+ gb_trees:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);