summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-01 12:43:27 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-01 12:43:27 +0000
commitf2ca90882a6f4e5b67a990d023b91d45101dc8f6 (patch)
tree46eccb0e18735c65bafe48a883ad64e89d9703fb
parentda9e6d2ba8ca5eba30dec82c6c06d1217bd1b7b9 (diff)
parent61398037deb3e03dce203ff05f9fab3c6e05ae52 (diff)
downloadrabbitmq-server-f2ca90882a6f4e5b67a990d023b91d45101dc8f6.tar.gz
merge default into bug23201
-rw-r--r--src/rabbit_channel.erl81
1 files changed, 57 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b2e6658b..ef85c318 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -50,15 +50,17 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
confirm_enabled, published_count, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, queues_for_msg}).
+ held_confirms, unconfirmed, queues_for_msg, exchange_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(STATISTICS_KEYS,
[pid,
transactional,
+ confirm,
consumer_count,
messages_unacknowledged,
+ unconfirmed,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -195,6 +197,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
confirm_multiple = false,
held_confirms = gb_sets:new(),
unconfirmed = gb_sets:new(),
+ exchange_for_msg = dict:new(),
queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
@@ -284,16 +287,19 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
handle_cast(flush_multiple_acks, State) ->
{noreply, flush_multiple(State)};
-handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}.
+handle_cast({confirm, MsgSeqNo, From},
+ State = #ch{exchange_for_msg = EFM}) ->
+ {ok, ExchangeName} = dict:find(MsgSeqNo, EFM),
+ {noreply, send_or_enqueue_ack(MsgSeqNo, From, ExchangeName, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
- State = #ch{queues_for_msg = QFM}) ->
+ State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) ->
State1 = dict:fold(
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> send_or_enqueue_ack(Msg, QPid, State0);
+ 0 -> {ok, ExchangeName} = dict:find(Msg, EFM),
+ send_or_enqueue_ack(Msg, QPid, ExchangeName, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -455,11 +461,13 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-send_or_enqueue_ack(undefined, _QPid, State) ->
+send_or_enqueue_ack(undefined, _QPid, _EN, State) ->
State;
-send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false}) ->
State;
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
+send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
+ State = #ch{confirm_multiple = false}) ->
+ maybe_incr_confirm_stats(QPid, ExchangeName, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
@@ -467,7 +475,9 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
WriterPid, #'basic.ack'{delivery_tag = MSN}),
State1
end, State);
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
+send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
+ State = #ch{confirm_multiple = true}) ->
+ maybe_incr_confirm_stats(QPid, ExchangeName, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
@@ -475,9 +485,17 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
State1#ch{held_confirms = gb_sets:add(MSN, As)})
end, State).
+maybe_incr_confirm_stats(QPid, ExchangeName, State) ->
+ maybe_incr_stats([{ExchangeName, 1}], confirm, State),
+ case QPid of
+ undefined -> ok;
+ _ -> maybe_incr_stats({{QPid, ExchangeName}, 1}, confirm, State)
+ end.
+
do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
- State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
+ State = #ch{unconfirmed = UC,
+ queues_for_msg = QFM,
+ exchange_for_msg = EFM}) ->
%% clears references to MsgSeqNo and does ConfirmFun
case gb_sets:is_element(MsgSeqNo, UC) of
true ->
@@ -494,6 +512,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
State#ch{
queues_for_msg =
dict:erase(MsgSeqNo, QFM),
+ exchange_for_msg =
+ dict:erase(MsgSeqNo, EFM),
unconfirmed = Unconfirmed1});
_ -> State#ch{queues_for_msg =
dict:store(MsgSeqNo, Qs1, QFM)}
@@ -548,16 +568,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
content = DecodedContent,
guid = rabbit_guid:guid(),
is_persistent = IsPersistent},
+ io:format("publishing ~p to ~p (~p)~n", [MsgSeqNo, ExchangeName, IsPersistent]),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State2),
+ io:format("did~n"),
{noreply, case TxnKey of
none -> State2;
_ -> add_tx_participants(DeliveredQPids, State2)
@@ -1220,21 +1242,24 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, [], MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, _, undefined, _, State) ->
+process_routing_result(unroutable, _QPids, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ send_or_enqueue_ack(MsgSeqNo, undefined, XName, State);
+process_routing_result(not_delivered, _QPids, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ send_or_enqueue_ack(MsgSeqNo, undefined, XName, State);
+process_routing_result(routed, [], XName, MsgSeqNo, _Msg, State) ->
+ send_or_enqueue_ack(MsgSeqNo, undefined, XName, State);
+process_routing_result(routed, _QPids, _XName, undefined, _Msg, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _,
- State = #ch{queues_for_msg = QFM}) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _Msg,
+ State = #ch{queues_for_msg = QFM,
+ exchange_for_msg = EFM}) ->
+ EFM1 = dict:store(MsgSeqNo, XName, EFM),
+ io:format("Msg -> X: ~p -> ~p~n", [MsgSeqNo, XName]),
QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
[maybe_monitor(QPid) || QPid <- QPids],
- State#ch{queues_for_msg = QFM1}.
+ State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1270,8 +1295,16 @@ i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{username = Username}) -> Username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE,
+ confirm_multiple = CM}) -> case {CE, CM} of
+ {false, _} -> none;
+ {_, false} -> single;
+ {_, true} -> multiple
+ end;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
+i(unconfirmed, #ch{unconfirmed = UC}) ->
+ gb_sets:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);