diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-21 13:05:57 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-21 13:05:57 +0000 |
commit | 19a654b6430350613e16652050fd4d18c16f13c6 (patch) | |
tree | e7b42d1c1b75838834fdd29cbc2baecdb231dd01 | |
parent | 15e93f4bd518848965fe22556200402270a1112d (diff) | |
parent | 0e40b583638ec14811852aa3281df677c4b7ccdb (diff) | |
download | rabbitmq-server-19a654b6430350613e16652050fd4d18c16f13c6.tar.gz |
merge from default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 10 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 86 |
2 files changed, 62 insertions, 34 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 2152cab3..93c85617 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1191,6 +1191,16 @@ messages to the channel's consumers. </para></listitem> </varlistentry> + <varlistentry> + <term>confirm</term> + <listitem><para>True if the channel is in confirm mode, false otherwise.</para></listitem> + </varlistentry> + <varlistentry> + <term>unconfirmed</term> + <listitem><para>Number of published messages not yet + confirmed. On channels not in confirm mode, this + remains 0.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>channelinfoitem</command>s are specified then pid, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b92206ad..40337843 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -41,8 +41,10 @@ -define(STATISTICS_KEYS, [pid, transactional, + confirm, consumer_count, messages_unacknowledged, + unconfirmed, acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -280,12 +282,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MsgSeqNos, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}), + {MXs, UC1} = remove_queue_unconfirmed( + gb_trees:next(gb_trees:iterator(UC)), QPid, + {[], UC}, State), erase_queue_stats(QPid), - noreply(queue_blocked(QPid, record_confirms(MsgSeqNos, - State#ch{unconfirmed = UC1}))). + noreply( + queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -471,38 +473,42 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QPid, Acc) -> +remove_queue_unconfirmed(none, _QPid, Acc, _State) -> Acc; -remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) -> +remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, Qs, Acc)). + remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State), + State). -record_confirm(undefined, State) -> State; -record_confirm(MsgSeqNo, State) -> record_confirms([MsgSeqNo], State). +record_confirm(undefined, _, State) -> + State; +record_confirm(MsgSeqNo, XName, State) -> + record_confirms([{MsgSeqNo, XName}], State). record_confirms([], State) -> State; -record_confirms(MsgSeqNos, State = #ch{confirmed = C}) -> - State#ch{confirmed = [MsgSeqNos | C]}. +record_confirms(MXs, State = #ch{confirmed = C}) -> + State#ch{confirmed = [MXs | C]}. confirm([], _QPid, State) -> State; confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {DoneMessages, UC2} = + {MXs, UC1} = lists:foldl( fun(MsgSeqNo, {_DMs, UC0} = Acc) -> case gb_trees:lookup(MsgSeqNo, UC0) of none -> Acc; - {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc) + {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) end end, {[], UC}, MsgSeqNos), - record_confirms(DoneMessages, State#ch{unconfirmed = UC2}). + record_confirms(MXs, State#ch{unconfirmed = UC1}). -remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) -> +remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> Qs1 = sets:del_element(QPid, Qs), + maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), case sets:size(Qs1) of - 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)} + 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -555,7 +561,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, + State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || @@ -1222,20 +1228,20 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_route), - record_confirm(MsgSeqNo, State); -process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_consumers), - record_confirm(MsgSeqNo, State); -process_routing_result(routed, [], MsgSeqNo, _, State) -> - record_confirm(MsgSeqNo, State); -process_routing_result(routed, _, undefined, _, State) -> +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_route), + record_confirm(MsgSeqNo, XName, State); +process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + record_confirm(MsgSeqNo, XName, State); +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> + record_confirm(MsgSeqNo, XName, State); +process_routing_result(routed, _, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, MsgSeqNo, _, State) -> +process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> #ch{unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC), + UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), State#ch{unconfirmed = UC1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> @@ -1243,9 +1249,18 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -send_confirms(State = #ch{confirmed = C}) -> - send_confirms(lists:append(C), State #ch{confirmed = []}). - +send_confirms(State = #ch{confirmed = C, stats_timer = StatsTimer}) -> + C1 = lists:append(C), + MsgSeqNos = case rabbit_event:stats_level(StatsTimer) of + fine -> + [ begin maybe_incr_stats([{ExchangeName, 1}], + confirm, State), + MsgSeqNo + end || {MsgSeqNo, ExchangeName} <- C1]; + _ -> + [MsgSeqNo || {MsgSeqNo, _} <- C1] + end, + send_confirms(MsgSeqNos, State #ch{confirmed = []}). send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> @@ -1255,7 +1270,7 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SCs) + 1; - false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo + false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), case Ms of @@ -1283,8 +1298,11 @@ i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); +i(unconfirmed, #ch{unconfirmed = UC}) -> + gb_trees:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); |