summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-14 16:30:06 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-14 16:30:06 +0000
commitc756d9590074d4f27f26cb0a91c85147a7e7ab12 (patch)
tree579dc5c2faab9b9514a128a9a9896dbcf64a9fb7
parent9334a5782e87701c0e950507bdf1be4ca8d402ae (diff)
parent2c56b0e6f768017b3ead54217a19cc3e158e141c (diff)
downloadrabbitmq-server-c756d9590074d4f27f26cb0a91c85147a7e7ab12.tar.gz
merge from default
-rw-r--r--docs/rabbitmqctl.1.xml12
-rw-r--r--src/rabbit_channel.erl121
2 files changed, 97 insertions, 36 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 2152cab3..ccc7c970 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1191,6 +1191,18 @@
messages to the channel's consumers.
</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>confirm</term>
+ <listitem><para>Confirm mode for the channel. Either
+ <command>none</command>, <command>single</command> or
+ <command>multiple</command>.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>unconfirmed</term>
+ <listitem><para>Number of published messages not yet
+ confirmed. On channels not in confirm mode, this
+ remains 0.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e2c3694b..f2b74dd1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -56,8 +56,10 @@
-define(STATISTICS_KEYS,
[pid,
transactional,
+ confirm,
consumer_count,
messages_unacknowledged,
+ unconfirmed,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -282,18 +284,30 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast({confirm, MsgSeqNos, From}, State) ->
- {noreply, confirm(MsgSeqNos, From, State)}.
+handle_cast({confirm, MsgSeqNos, From},
+ State= #ch{stats_timer = StatsTimer}) ->
+ case rabbit_event:stats_level(StatsTimer) of
+ fine -> {noreply, group_and_confirm(MsgSeqNos, From, State)};
+ _ -> {noreply, confirm(MsgSeqNos, From, undefined, State)}
+ end.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{unconfirmed = UC}) ->
%% TODO: this does a complete scan and partial rebuild of the
%% tree, which is quite efficient. To do better we'd need to
%% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MsgSeqNos, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}),
- State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}),
+ {EMs, UC1} = remove_queue_unconfirmed(
+ gb_trees:next(gb_trees:iterator(UC)), QPid,
+ {[], UC}),
+ State1 = case lists:usort(EMs) of
+ [] -> State;
+ [{XName, [MsgSeqNo]} | EMs1] ->
+ EMs2 = group_confirms_by_exchange(EMs1,
+ [{XName, [MsgSeqNo]}]),
+ lists:fold(fun({XName1, MsgSeqNos}, State0) ->
+ send_confirms(MsgSeqNos, XName1, State0)
+ end, State#ch{unconfirmed = UC1}, EMs2)
+ end,
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
@@ -468,30 +482,60 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc) ->
+remove_queue_unconfirmed(none, _QX, Acc) ->
Acc;
-remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) ->
+remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc) ->
remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, Qs, Acc)).
+ remove_qmsg(MsgSeqNo, QPid, QX, Acc)).
+
+group_and_confirm([], _QPid, State) ->
+ State;
+group_and_confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ EMs = lists:foldl(
+ fun(MsgSeqNo, EMs) ->
+ case gb_trees:lookup(MsgSeqNo, UC) of
+ {value, {_, XName}} -> [{MsgSeqNo, XName} | EMs];
+ none -> EMs
+ end
+ end, [], MsgSeqNos),
+ case lists:usort(EMs) of
+ [{XName, MsgSeqNo} | EMs1] ->
+ lists:foldl(
+ fun({XName1, MsgSeqNosE}, State0) ->
+ confirm(MsgSeqNosE, QPid, XName1, State0)
+ end, State,
+ group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}]));
+ [] ->
+ State
+ end.
+
+group_confirms_by_exchange([], Acc) ->
+ Acc;
+group_confirms_by_exchange([{E, Msg1} | EMs], [{E, Msgs} | Acc]) ->
+ group_confirms_by_exchange(EMs, [{E, [Msg1 | Msgs]} | Acc]);
+group_confirms_by_exchange([{E, Msg1} | EMs], Acc) ->
+ group_confirms_by_exchange(EMs, [{E, [Msg1]} | Acc]).
-confirm([], _QPid, State) ->
+confirm([], _QPid, _XName, State) ->
State;
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {DoneMessages, UC2} =
+confirm(MsgSeqNos, QPid, XName, State = #ch{unconfirmed = UC}) ->
+ {{DoneMessages, UC1}, UniqueSeqNos} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
+ fun(MsgSeqNo, {{_DMs, UC0} = Acc, USN}) ->
case gb_trees:lookup(MsgSeqNo, UC0) of
none -> Acc;
- {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc)
+ {value, Qs} -> {remove_qmsg(MsgSeqNo, QPid, Qs, Acc),
+ USN + 1}
end
- end, {[], UC}, MsgSeqNos),
- send_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
+ end, {{[], UC}, 0}, MsgSeqNos),
+ maybe_incr_stats([{{QPid, XName}, UniqueSeqNos}], confirm, State),
+ send_confirms(DoneMessages, XName, State#ch{unconfirmed = UC1}).
-remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) ->
+remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {MsgSeqNos, UC}) ->
Qs1 = sets:del_element(QPid, Qs),
case sets:size(Qs1) of
- 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)}
+ 0 -> {[{XName, MsgSeqNo} | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)};
+ _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -544,7 +588,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
@@ -1202,20 +1246,20 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_confirms([MsgSeqNo], State);
-process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_confirms([MsgSeqNo], State);
-process_routing_result(routed, [], MsgSeqNo, _, State) ->
- send_confirms([MsgSeqNo], State);
-process_routing_result(routed, _, undefined, _, State) ->
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ send_confirms([MsgSeqNo], XName, State);
+process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ send_confirms([MsgSeqNo], XName, State);
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ send_confirms([MsgSeqNo], XName, State);
+process_routing_result(routed, _, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
#ch{unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC),
+ UC1 = gb_trees:insert(MsgSeqNo, {sets:from_list(QPids), XName}, UC),
State#ch{unconfirmed = UC1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
@@ -1223,12 +1267,14 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-send_confirms([], State) ->
+send_confirms([], _, State) ->
State;
-send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
+send_confirms([MsgSeqNo], XName,
+ State = #ch{writer_pid = WriterPid}) ->
send_confirm(MsgSeqNo, WriterPid),
- State;
-send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ maybe_incr_stats([{XName, 1}], confirm, State);
+send_confirms(Cs, XName,
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SCs = lists:usort(Cs),
CutOff = case gb_trees:is_empty(UC) of
true -> lists:last(SCs) + 1;
@@ -1242,7 +1288,7 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
multiple = true})
end,
[ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
- State.
+ maybe_incr_stats([{XName, length(Cs)}], confirm, State).
send_confirm(undefined, _WriterPid) ->
ok;
@@ -1262,8 +1308,11 @@ i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
+i(unconfirmed, #ch{unconfirmed = UC}) ->
+ gb_trees:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);