summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-12-08 10:31:01 +0000
committerSimon MacMullen <simon@rabbitmq.com>2010-12-08 10:31:01 +0000
commitb4d7b61f03173fde8fd53f6eb23e22fb3fcce700 (patch)
treeb9fecbb6164cf34e30ec3297275a2a66817f6966
parent5d3b291386560f1d9629be9293f2d624779ef322 (diff)
parent53c5b3e08d7a21d6ca81bc6048476ffcf508618d (diff)
downloadrabbitmq-server-b4d7b61f03173fde8fd53f6eb23e22fb3fcce700.tar.gz
Merge default into bug23201.
-rw-r--r--src/rabbit_channel.erl81
1 files changed, 57 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0c8ad00a..11342c60 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -50,15 +50,17 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, queues_for_msg}).
+ held_confirms, unconfirmed, queues_for_msg, exchange_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(STATISTICS_KEYS,
[pid,
transactional,
+ confirm,
consumer_count,
messages_unacknowledged,
+ unconfirmed,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -195,6 +197,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
confirm_multiple = false,
held_confirms = gb_sets:new(),
unconfirmed = gb_sets:new(),
+ exchange_for_msg = dict:new(),
queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
@@ -284,16 +287,23 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
handle_cast(flush_multiple_acks, State) ->
{noreply, flush_multiple(State)};
-handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}.
+handle_cast({confirm, MsgSeqNo, From},
+ State = #ch{exchange_for_msg = EFM}) ->
+ State1 = case dict:find(MsgSeqNo, EFM) of
+ {ok, ExchangeName} ->
+ send_or_enqueue_ack(MsgSeqNo, From, ExchangeName, State);
+ _ -> State %% no entry in EFM means it's already been confirmed
+ end,
+ {noreply, State1}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
- State = #ch{queues_for_msg = QFM}) ->
+ State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) ->
State1 = dict:fold(
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> send_or_enqueue_ack(Msg, QPid, State0);
+ 0 -> {ok, ExchangeName} = dict:find(Msg, EFM),
+ send_or_enqueue_ack(Msg, QPid, ExchangeName, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -455,11 +465,13 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-send_or_enqueue_ack(undefined, _QPid, State) ->
+send_or_enqueue_ack(undefined, _QPid, _EN, State) ->
State;
-send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false}) ->
State;
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
+send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
+ State = #ch{confirm_multiple = false}) ->
+ maybe_incr_confirm_queue_stats(QPid, ExchangeName, State),
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
@@ -467,16 +479,25 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
delivery_tag = MSN}),
State1
end, State);
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
+send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
+ State = #ch{confirm_multiple = true}) ->
+ maybe_incr_confirm_queue_stats(QPid, ExchangeName, State),
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
start_confirm_timer(
State1#ch{held_confirms = gb_sets:add(MSN, As)})
end, State).
+maybe_incr_confirm_queue_stats(QPid, ExchangeName, State) ->
+ case QPid of
+ undefined -> ok;
+ _ -> maybe_incr_stats([{{QPid, ExchangeName}, 1}], confirm, State)
+ end.
+
do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
- State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
+ State = #ch{unconfirmed = UC,
+ queues_for_msg = QFM,
+ exchange_for_msg = EFM}) ->
%% clears references to MsgSeqNo and does ConfirmFun
case gb_sets:is_element(MsgSeqNo, UC) of
true ->
@@ -492,6 +513,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
State#ch{
queues_for_msg =
dict:erase(MsgSeqNo, QFM),
+ exchange_for_msg =
+ dict:erase(MsgSeqNo, EFM),
unconfirmed = Unconfirmed1});
_ -> State#ch{queues_for_msg =
dict:store(MsgSeqNo, Qs1, QFM)}
@@ -552,7 +575,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
@@ -1219,21 +1242,23 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, [], MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, _, undefined, _, State) ->
+process_routing_result(unroutable, _QPids, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ send_or_enqueue_ack(MsgSeqNo, undefined, XName, State);
+process_routing_result(not_delivered, _QPids, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ send_or_enqueue_ack(MsgSeqNo, undefined, XName, State);
+process_routing_result(routed, [], XName, MsgSeqNo, _Msg, State) ->
+ send_or_enqueue_ack(MsgSeqNo, undefined, XName, State);
+process_routing_result(routed, _QPids, _XName, undefined, _Msg, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _,
- State = #ch{queues_for_msg = QFM}) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _Msg,
+ State = #ch{queues_for_msg = QFM,
+ exchange_for_msg = EFM}) ->
+ EFM1 = dict:store(MsgSeqNo, XName, EFM),
QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
[maybe_monitor(QPid) || QPid <- QPids],
- State#ch{queues_for_msg = QFM1}.
+ State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1269,8 +1294,16 @@ i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{username = Username}) -> Username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE,
+ confirm_multiple = CM}) -> case {CE, CM} of
+ {false, _} -> none;
+ {_, false} -> single;
+ {_, true} -> multiple
+ end;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
+i(unconfirmed, #ch{unconfirmed = UC}) ->
+ gb_sets:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);