diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-02-28 11:00:19 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-02-28 11:00:19 +0000 |
commit | 618af82a36778eb9f354e1b430d23cf711795eaf (patch) | |
tree | dfa8837b2e32eb3d80fbd7415bbdf64df5aba79f /src/rabbit_channel.erl | |
parent | cd64ab0f9b9fe0689a74681fed4e65d7ce333b8f (diff) | |
parent | e1ff3d59146b9dd32532aa58a119a4582bbc2ecd (diff) | |
download | rabbitmq-server-618af82a36778eb9f354e1b430d23cf711795eaf.tar.gz |
Merge bug23580 (Add statistics for 'basic.return')
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 225 |
1 files changed, 139 insertions, 86 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 87357b89..d8a332f3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,21 +20,22 @@ -behaviour(gen_server2). --export([start_link/7, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/9, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1]). +-export([emit_stats/1, ready_for_close/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2]). --record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, confirmed}). + confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, + confirmed, capabilities}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -66,10 +67,10 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/7 :: - (channel_number(), pid(), pid(), rabbit_types:user(), - rabbit_types:vhost(), pid(), - fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> +-spec(start_link/9 :: + (channel_number(), pid(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -89,15 +90,17 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(ready_for_close/1 :: (pid()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, - StartLimiterFun) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User, - VHost, CollectorPid, StartLimiterFun], []). +start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, + CollectorPid, StartLimiterFun) -> + gen_server2:start_link( + ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost, + Capabilities, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -106,7 +109,7 @@ do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). flush(Pid) -> - gen_server2:call(Pid, flush). + gen_server2:call(Pid, flush, infinity). shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -146,14 +149,18 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). +ready_for_close(Pid) -> + gen_server2:cast(Pid, ready_for_close). + %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, - StartLimiterFun]) -> +init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, + CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, + protocol = Protocol, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, @@ -173,8 +180,10 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_trees:empty(), - confirmed = []}, + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), + confirmed = [], + capabilities = Capabilities}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -218,14 +227,11 @@ handle_cast({method, Method, Content}, State) -> {noreply, NewState} -> noreply(NewState); stop -> - {stop, normal, State#ch{state = terminating}} + {stop, normal, State} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - {stop, normal, terminating(Reason#amqp_error{method = MethodName}, - State)}; - exit:normal -> - {stop, normal, State}; + send_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -233,6 +239,11 @@ handle_cast({method, Method, Content}, State) -> handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State), hibernate}; +handle_cast(ready_for_close, State = #ch{state = closing, + writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), + {stop, normal, State}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -278,19 +289,22 @@ handle_info(timeout, State) -> noreply(State); handle_info({'DOWN', _MRef, process, QPid, Reason}, - State = #ch{unconfirmed = UC}) -> - %% TODO: this does a complete scan and partial rebuild of the - %% tree, which is quite efficient. To do better we'd need to - %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MXs, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}, State), + State = #ch{unconfirmed_qm = UQM}) -> + MsgSeqNos = case gb_trees:lookup(QPid, UQM) of + {value, MsgSet} -> gb_sets:to_list(MsgSet); + none -> [] + end, + %% We remove the MsgSeqNos from UQM before calling + %% process_confirms to prevent each MsgSeqNo being removed from + %% the set one by one which which would be inefficient + State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, + {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1), erase_queue_stats(QPid), - State1 = case Reason of - normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); - _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) - end, - noreply(queue_blocked(QPid, State1)). + State3 = (case Reason of + normal -> fun record_confirms/2; + _ -> fun send_nacks/2 + end)(MXs, State2), + noreply(queue_blocked(QPid, State3)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -302,18 +316,16 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{stats_timer = StatsTimer1}}. -terminate(_Reason, State = #ch{state = terminating}) -> - terminate(State); - terminate(Reason, State) -> - Res = rollback_and_notify(State), + {Res, _State1} = rollback_and_notify(State), case Reason of normal -> ok = Res; shutdown -> ok = Res; {shutdown, _Term} -> ok = Res; _ -> ok end, - terminate(State). + pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(channel_closed, [{pid, self()}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -351,10 +363,22 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> - ok = rollback_and_notify(State), - Reader ! {channel_exit, Channel, Reason}, - State#ch{state = terminating}. +send_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid}) -> + {CloseChannel, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", + [ReaderPid, Channel, Reason]), + %% something bad's happened: rollback_and_notify may not be 'ok' + {_Result, State1} = rollback_and_notify(State), + case CloseChannel of + Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), + {noreply, State1}; + _ -> ReaderPid ! {channel_exit, Channel, Reason}, + {stop, normal, State1} + end. return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> @@ -476,13 +500,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QPid, Acc, _State) -> - Acc; -remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> - remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State), - State). - record_confirm(undefined, _, State) -> State; record_confirm(MsgSeqNo, XName, State) -> @@ -495,25 +512,43 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = +confirm(MsgSeqNos, QPid, State) -> + {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), + record_confirms(MXs, State1). + +process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}) -> + {MXs, UMQ1, UQM1} = lists:foldl( - fun(MsgSeqNo, {_DMs, UC0} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UC0) of - none -> Acc; - {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) + fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UMQ0) of + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc, + State); + none -> Acc end - end, {[], UC}, MsgSeqNos), - record_confirms(MXs, State#ch{unconfirmed = UC1}). + end, {[], UMQ, UQM}, MsgSeqNos), + {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> - Qs1 = sets:del_element(QPid, Qs), +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> %% these confirms will be emitted even when a queue dies, but that %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), - case sets:size(Qs1) of - 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} + UQM1 = case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), + case gb_sets:is_empty(MsgSeqNos1) of + true -> gb_trees:delete(QPid, UQM); + false -> gb_trees:update(QPid, MsgSeqNos1, UQM) + end; + none -> + UQM + end, + Qs1 = gb_sets:del_element(QPid, Qs), + case gb_sets:is_empty(Qs1) of + true -> + {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; + false -> + {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -526,11 +561,20 @@ handle_method(#'channel.open'{}, _, _State) -> handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); -handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> - ok = rollback_and_notify(State), - ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), +handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> stop; +handle_method(#'channel.close'{}, _, State = #ch{state = closing}) -> + {reply, #'channel.close_ok'{}, State}; + +handle_method(_Method, _, State = #ch{state = closing}) -> + {noreply, State}; + +handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> + {ok, State1} = rollback_and_notify(State), + ReaderPid ! {channel_closing, self()}, + {noreply, State1}; + handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; @@ -1081,9 +1125,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, - WriterPid, Reason) -> - {_Close, ReplyCode, ReplyText} = - rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), + #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> + {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, @@ -1170,10 +1213,13 @@ internal_rollback(State = #ch{transaction_id = TxnKey, NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}). +rollback_and_notify(State = #ch{state = closing}) -> + {ok, State}; rollback_and_notify(State = #ch{transaction_id = none}) -> - notify_queues(State); + {notify_queues(State), State#ch{state = closing}}; rollback_and_notify(State) -> - notify_queues(internal_rollback(State)). + State1 = internal_rollback(State), + {notify_queues(State1), State1#ch{state = closing}}. fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( @@ -1240,12 +1286,12 @@ is_message_persistent(Content) -> end. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_route), + ok = basic_return(Msg, State, no_route), maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], return_unroutable, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + ok = basic_return(Msg, State, no_consumers), maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], return_not_delivered, State), record_confirm(MsgSeqNo, XName, State); @@ -1254,10 +1300,21 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed = UC} = State, - [maybe_monitor(QPid) || QPid <- QPids], - UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), - State#ch{unconfirmed = UC1}. + #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, + UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), + SingletonSet = gb_sets:singleton(MsgSeqNo), + UQM1 = lists:foldl( + fun (QPid, UQM2) -> + maybe_monitor(QPid), + case gb_trees:lookup(QPid, UQM2) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), + gb_trees:update(QPid, MsgSeqNos1, UQM2); + none -> + gb_trees:insert(QPid, SingletonSet, UQM2) + end + end, UQM, QPids), + State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1293,11 +1350,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UC) of + CutOff = case gb_trees:is_empty(UMQ) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo + false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1309,10 +1366,6 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -terminate(_State) -> - pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); @@ -1324,8 +1377,8 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed = UC}) -> - gb_trees:size(UC); +i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> + gb_trees:size(UMQ); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); |