diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-14 16:30:06 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-14 16:30:06 +0000 |
commit | c756d9590074d4f27f26cb0a91c85147a7e7ab12 (patch) | |
tree | 579dc5c2faab9b9514a128a9a9896dbcf64a9fb7 | |
parent | 9334a5782e87701c0e950507bdf1be4ca8d402ae (diff) | |
parent | 2c56b0e6f768017b3ead54217a19cc3e158e141c (diff) | |
download | rabbitmq-server-c756d9590074d4f27f26cb0a91c85147a7e7ab12.tar.gz |
merge from default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 12 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 121 |
2 files changed, 97 insertions, 36 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 2152cab3..ccc7c970 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1191,6 +1191,18 @@ messages to the channel's consumers. </para></listitem> </varlistentry> + <varlistentry> + <term>confirm</term> + <listitem><para>Confirm mode for the channel. Either + <command>none</command>, <command>single</command> or + <command>multiple</command>.</para></listitem> + </varlistentry> + <varlistentry> + <term>unconfirmed</term> + <listitem><para>Number of published messages not yet + confirmed. On channels not in confirm mode, this + remains 0.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>channelinfoitem</command>s are specified then pid, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e2c3694b..f2b74dd1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -56,8 +56,10 @@ -define(STATISTICS_KEYS, [pid, transactional, + confirm, consumer_count, messages_unacknowledged, + unconfirmed, acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -282,18 +284,30 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast({confirm, MsgSeqNos, From}, State) -> - {noreply, confirm(MsgSeqNos, From, State)}. +handle_cast({confirm, MsgSeqNos, From}, + State= #ch{stats_timer = StatsTimer}) -> + case rabbit_event:stats_level(StatsTimer) of + fine -> {noreply, group_and_confirm(MsgSeqNos, From, State)}; + _ -> {noreply, confirm(MsgSeqNos, From, undefined, State)} + end. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{unconfirmed = UC}) -> %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MsgSeqNos, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}), - State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}), + {EMs, UC1} = remove_queue_unconfirmed( + gb_trees:next(gb_trees:iterator(UC)), QPid, + {[], UC}), + State1 = case lists:usort(EMs) of + [] -> State; + [{XName, [MsgSeqNo]} | EMs1] -> + EMs2 = group_confirms_by_exchange(EMs1, + [{XName, [MsgSeqNo]}]), + lists:fold(fun({XName1, MsgSeqNos}, State0) -> + send_confirms(MsgSeqNos, XName1, State0) + end, State#ch{unconfirmed = UC1}, EMs2) + end, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. @@ -468,30 +482,60 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QPid, Acc) -> +remove_queue_unconfirmed(none, _QX, Acc) -> Acc; -remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) -> +remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc) -> remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, Qs, Acc)). + remove_qmsg(MsgSeqNo, QPid, QX, Acc)). + +group_and_confirm([], _QPid, State) -> + State; +group_and_confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + EMs = lists:foldl( + fun(MsgSeqNo, EMs) -> + case gb_trees:lookup(MsgSeqNo, UC) of + {value, {_, XName}} -> [{MsgSeqNo, XName} | EMs]; + none -> EMs + end + end, [], MsgSeqNos), + case lists:usort(EMs) of + [{XName, MsgSeqNo} | EMs1] -> + lists:foldl( + fun({XName1, MsgSeqNosE}, State0) -> + confirm(MsgSeqNosE, QPid, XName1, State0) + end, State, + group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}])); + [] -> + State + end. + +group_confirms_by_exchange([], Acc) -> + Acc; +group_confirms_by_exchange([{E, Msg1} | EMs], [{E, Msgs} | Acc]) -> + group_confirms_by_exchange(EMs, [{E, [Msg1 | Msgs]} | Acc]); +group_confirms_by_exchange([{E, Msg1} | EMs], Acc) -> + group_confirms_by_exchange(EMs, [{E, [Msg1]} | Acc]). -confirm([], _QPid, State) -> +confirm([], _QPid, _XName, State) -> State; -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {DoneMessages, UC2} = +confirm(MsgSeqNos, QPid, XName, State = #ch{unconfirmed = UC}) -> + {{DoneMessages, UC1}, UniqueSeqNos} = lists:foldl( - fun(MsgSeqNo, {_DMs, UC0} = Acc) -> + fun(MsgSeqNo, {{_DMs, UC0} = Acc, USN}) -> case gb_trees:lookup(MsgSeqNo, UC0) of none -> Acc; - {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc) + {value, Qs} -> {remove_qmsg(MsgSeqNo, QPid, Qs, Acc), + USN + 1} end - end, {[], UC}, MsgSeqNos), - send_confirms(DoneMessages, State#ch{unconfirmed = UC2}). + end, {{[], UC}, 0}, MsgSeqNos), + maybe_incr_stats([{{QPid, XName}, UniqueSeqNos}], confirm, State), + send_confirms(DoneMessages, XName, State#ch{unconfirmed = UC1}). -remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) -> +remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {MsgSeqNos, UC}) -> Qs1 = sets:del_element(QPid, Qs), case sets:size(Qs1) of - 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)} + 0 -> {[{XName, MsgSeqNo} | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -544,7 +588,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, + State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || @@ -1202,20 +1246,20 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_route), - send_confirms([MsgSeqNo], State); -process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_consumers), - send_confirms([MsgSeqNo], State); -process_routing_result(routed, [], MsgSeqNo, _, State) -> - send_confirms([MsgSeqNo], State); -process_routing_result(routed, _, undefined, _, State) -> +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_route), + send_confirms([MsgSeqNo], XName, State); +process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + send_confirms([MsgSeqNo], XName, State); +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> + send_confirms([MsgSeqNo], XName, State); +process_routing_result(routed, _, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, MsgSeqNo, _, State) -> +process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> #ch{unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC), + UC1 = gb_trees:insert(MsgSeqNo, {sets:from_list(QPids), XName}, UC), State#ch{unconfirmed = UC1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> @@ -1223,12 +1267,14 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -send_confirms([], State) -> +send_confirms([], _, State) -> State; -send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> +send_confirms([MsgSeqNo], XName, + State = #ch{writer_pid = WriterPid}) -> send_confirm(MsgSeqNo, WriterPid), - State; -send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + maybe_incr_stats([{XName, 1}], confirm, State); +send_confirms(Cs, XName, + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SCs) + 1; @@ -1242,7 +1288,7 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> multiple = true}) end, [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], - State. + maybe_incr_stats([{XName, length(Cs)}], confirm, State). send_confirm(undefined, _WriterPid) -> ok; @@ -1262,8 +1308,11 @@ i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); +i(unconfirmed, #ch{unconfirmed = UC}) -> + gb_trees:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); |