diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-12-08 10:31:01 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-12-08 10:31:01 +0000 |
commit | b4d7b61f03173fde8fd53f6eb23e22fb3fcce700 (patch) | |
tree | b9fecbb6164cf34e30ec3297275a2a66817f6966 | |
parent | 5d3b291386560f1d9629be9293f2d624779ef322 (diff) | |
parent | 53c5b3e08d7a21d6ca81bc6048476ffcf508618d (diff) | |
download | rabbitmq-server-b4d7b61f03173fde8fd53f6eb23e22fb3fcce700.tar.gz |
Merge default into bug23201.
-rw-r--r-- | src/rabbit_channel.erl | 81 |
1 files changed, 57 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c8ad00a..11342c60 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -50,15 +50,17 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, queues_for_msg}). + held_confirms, unconfirmed, queues_for_msg, exchange_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(STATISTICS_KEYS, [pid, transactional, + confirm, consumer_count, messages_unacknowledged, + unconfirmed, acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -195,6 +197,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, confirm_multiple = false, held_confirms = gb_sets:new(), unconfirmed = gb_sets:new(), + exchange_for_msg = dict:new(), queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, @@ -284,16 +287,23 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> handle_cast(flush_multiple_acks, State) -> {noreply, flush_multiple(State)}; -handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. +handle_cast({confirm, MsgSeqNo, From}, + State = #ch{exchange_for_msg = EFM}) -> + State1 = case dict:find(MsgSeqNo, EFM) of + {ok, ExchangeName} -> + send_or_enqueue_ack(MsgSeqNo, From, ExchangeName, State); + _ -> State %% no entry in EFM means it's already been confirmed + end, + {noreply, State1}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, - State = #ch{queues_for_msg = QFM}) -> + State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) -> State1 = dict:fold( fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> send_or_enqueue_ack(Msg, QPid, State0); + 0 -> {ok, ExchangeName} = dict:find(Msg, EFM), + send_or_enqueue_ack(Msg, QPid, ExchangeName, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -455,11 +465,13 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -send_or_enqueue_ack(undefined, _QPid, State) -> +send_or_enqueue_ack(undefined, _QPid, _EN, State) -> State; -send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false}) -> State; -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> +send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, + State = #ch{confirm_multiple = false}) -> + maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( @@ -467,16 +479,25 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> delivery_tag = MSN}), State1 end, State); -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> +send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, + State = #ch{confirm_multiple = true}) -> + maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> start_confirm_timer( State1#ch{held_confirms = gb_sets:add(MSN, As)}) end, State). +maybe_incr_confirm_queue_stats(QPid, ExchangeName, State) -> + case QPid of + undefined -> ok; + _ -> maybe_incr_stats([{{QPid, ExchangeName}, 1}], confirm, State) + end. + do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, - State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> + State = #ch{unconfirmed = UC, + queues_for_msg = QFM, + exchange_for_msg = EFM}) -> %% clears references to MsgSeqNo and does ConfirmFun case gb_sets:is_element(MsgSeqNo, UC) of true -> @@ -492,6 +513,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, State#ch{ queues_for_msg = dict:erase(MsgSeqNo, QFM), + exchange_for_msg = + dict:erase(MsgSeqNo, EFM), unconfirmed = Unconfirmed1}); _ -> State#ch{queues_for_msg = dict:store(MsgSeqNo, Qs1, QFM)} @@ -552,7 +575,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, + State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || @@ -1219,21 +1242,23 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_route), - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_consumers), - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, [], MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, _, undefined, _, State) -> +process_routing_result(unroutable, _QPids, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_route), + send_or_enqueue_ack(MsgSeqNo, undefined, XName, State); +process_routing_result(not_delivered, _QPids, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + send_or_enqueue_ack(MsgSeqNo, undefined, XName, State); +process_routing_result(routed, [], XName, MsgSeqNo, _Msg, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, XName, State); +process_routing_result(routed, _QPids, _XName, undefined, _Msg, State) -> State; -process_routing_result(routed, QPids, MsgSeqNo, _, - State = #ch{queues_for_msg = QFM}) -> +process_routing_result(routed, QPids, XName, MsgSeqNo, _Msg, + State = #ch{queues_for_msg = QFM, + exchange_for_msg = EFM}) -> + EFM1 = dict:store(MsgSeqNo, XName, EFM), QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = QFM1}. + State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1269,8 +1294,16 @@ i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{username = Username}) -> Username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE, + confirm_multiple = CM}) -> case {CE, CM} of + {false, _} -> none; + {_, false} -> single; + {_, true} -> multiple + end; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); +i(unconfirmed, #ch{unconfirmed = UC}) -> + gb_sets:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); |