diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-21 11:23:50 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-21 11:23:50 +0000 |
commit | ea2fcaea17c27e6d186d1244b9ad918c83dabe92 (patch) | |
tree | 2b2b91408a32040ed110a35a61db2faba60ddfaa | |
parent | b4c3bf3e2b677325d05b0cce115214f79aa362b9 (diff) | |
parent | f977d37da50bce8c8863a28d1a90534e8b486275 (diff) | |
download | rabbitmq-server-ea2fcaea17c27e6d186d1244b9ad918c83dabe92.tar.gz |
Merging default into bug21647
-rw-r--r-- | src/file_handle_cache.erl | 27 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 26 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 106 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 25 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 17 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 168 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 14 |
7 files changed, 159 insertions, 224 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 1e1f37cb..f41815d0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -146,7 +146,8 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1, set_limit/1, get_limit/0]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, + info/1]). -export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -259,11 +260,17 @@ -spec(transfer/1 :: (pid()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). +-spec(info_keys/0 :: () -> [atom()]). +-spec(info/0 :: () -> [{atom(), any()}]). +-spec(info/1 :: ([atom()]) -> [{atom(), any()}]). -spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()). -endif. %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [obtain_count, obtain_limit]). + +%%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -494,6 +501,11 @@ set_limit(Limit) -> get_limit() -> gen_server:call(?SERVER, get_limit, infinity). +info_keys() -> ?INFO_KEYS. + +info() -> info(?INFO_KEYS). +info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity). + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -789,6 +801,12 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, {Error, Handle} end. +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(obtain_count, #fhc_state{obtain_count = Count}) -> Count; +i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(Item, _) -> throw({bad_argument, Item}). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -871,13 +889,18 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, false -> {noreply, run_pending_item(Item, State)} end; + handle_call({set_limit, Limit}, _From, State) -> {reply, ok, maybe_reduce( process_pending(State #fhc_state { limit = Limit, obtain_limit = obtain_limit(Limit) }))}; + handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> - {reply, Limit, State}. + {reply, Limit, State}; + +handle_call({info, Items}, _From, State) -> + {reply, infos(Items, State), State}. handle_cast({register_callback, Pid, MFA}, State = #fhc_state { clients = Clients }) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index d67c7f58..dc81ace6 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -61,8 +61,7 @@ -spec(map_exception/3 :: (rabbit_channel:channel_number(), rabbit_types:amqp_error() | any(), rabbit_types:protocol()) -> - {boolean(), - rabbit_channel:channel_number(), + {rabbit_channel:channel_number(), rabbit_framing:amqp_method_record()}). -endif. @@ -301,24 +300,21 @@ clear_encoded_content(Content = #content{}) -> map_exception(Channel, Reason, Protocol) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = lookup_amqp_exception(Reason, Protocol), - ShouldClose = SuggestedClose orelse (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; none -> {0, 0}; _ -> Protocol:method_id(FailedMethod) end, - {CloseChannel, CloseMethod} = - case ShouldClose of - true -> {0, #'connection.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}}; - false -> {Channel, #'channel.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}} - end, - {ShouldClose, CloseChannel, CloseMethod}. + case SuggestedClose orelse (Channel == 0) of + true -> {0, #'connection.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}}; + false -> {Channel, #'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}} + end. lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ec3088dd..abda1c1f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,16 +20,16 @@ -behaviour(gen_server2). --export([start_link/8, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/9, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1]). +-export([emit_stats/1, ready_for_close/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2]). --record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, @@ -67,10 +67,10 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/8 :: - (channel_number(), pid(), pid(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(), - fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> +-spec(start_link/9 :: + (channel_number(), pid(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -90,16 +90,17 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(ready_for_close/1 :: (pid()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, User, VHost, Capabilities, +start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun) -> - gen_server2:start_link(?MODULE, - [Channel, ReaderPid, WriterPid, User, VHost, - Capabilities, CollectorPid, StartLimiterFun], []). + gen_server2:start_link( + ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost, + Capabilities, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -148,14 +149,18 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). +ready_for_close(Pid) -> + gen_server2:cast(Pid, ready_for_close). + %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid, - StartLimiterFun]) -> +init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, + CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, + protocol = Protocol, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, @@ -223,14 +228,11 @@ handle_cast({method, Method, Content}, State) -> {noreply, NewState} -> noreply(NewState); stop -> - {stop, normal, State#ch{state = terminating}} + {stop, normal, State} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - {stop, normal, terminating(Reason#amqp_error{method = MethodName}, - State)}; - exit:normal -> - {stop, normal, State}; + send_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -238,6 +240,11 @@ handle_cast({method, Method, Content}, State) -> handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State), hibernate}; +handle_cast(ready_for_close, State = #ch{state = closing, + writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), + {stop, normal, State}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -307,18 +314,16 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{stats_timer = StatsTimer1}}. -terminate(_Reason, State = #ch{state = terminating}) -> - terminate(State); - terminate(Reason, State) -> - Res = rollback_and_notify(State), + {Res, _State1} = rollback_and_notify(State), case Reason of normal -> ok = Res; shutdown -> ok = Res; {shutdown, _Term} -> ok = Res; _ -> ok end, - terminate(State). + pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(channel_closed, [{pid, self()}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -356,10 +361,22 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> - ok = rollback_and_notify(State), - Reader ! {channel_exit, Channel, Reason}, - State#ch{state = terminating}. +send_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid}) -> + {CloseChannel, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", + [ReaderPid, Channel, Reason]), + %% something bad's happened: rollback_and_notify may not be 'ok' + {_Result, State1} = rollback_and_notify(State), + case CloseChannel of + Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), + {noreply, State1}; + _ -> ReaderPid ! {channel_exit, Channel, Reason}, + {stop, normal, State1} + end. return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> @@ -542,11 +559,20 @@ handle_method(#'channel.open'{}, _, _State) -> handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); -handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> - ok = rollback_and_notify(State), - ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), +handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> stop; +handle_method(#'channel.close'{}, _, State = #ch{state = closing}) -> + {reply, #'channel.close_ok'{}, State}; + +handle_method(_Method, _, State = #ch{state = closing}) -> + {noreply, State}; + +handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> + {ok, State1} = rollback_and_notify(State), + ReaderPid ! {channel_closing, self()}, + {noreply, State1}; + handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; @@ -1156,9 +1182,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, - WriterPid, Reason) -> - {_Close, ReplyCode, ReplyText} = - rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), + #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> + {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, @@ -1245,10 +1270,13 @@ internal_rollback(State = #ch{transaction_id = TxnKey, NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}). +rollback_and_notify(State = #ch{state = closing}) -> + {ok, State}; rollback_and_notify(State = #ch{transaction_id = none}) -> - notify_queues(State); + {notify_queues(State), State#ch{state = closing}}; rollback_and_notify(State) -> - notify_queues(internal_rollback(State)). + State1 = internal_rollback(State), + {notify_queues(State1), State1#ch{state = closing}}. fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( @@ -1308,10 +1336,10 @@ is_message_persistent(Content) -> end. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_route), + ok = basic_return(Msg, State, no_route), record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + ok = basic_return(Msg, State, no_consumers), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1384,10 +1412,6 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -terminate(_State) -> - pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 90058194..9cc407bc 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -31,12 +31,13 @@ -export_type([start_link_args/0]). -type(start_link_args() :: - {'tcp', rabbit_types:protocol(), rabbit_net:socket(), - rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + {'tcp', rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), pid(), rabbit_types:protocol(), rabbit_types:user(), + rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()} | - {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}). + {'direct', rabbit_channel:channel_number(), pid(), + rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), + rabbit_framing:amqp_table(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -44,7 +45,7 @@ %%---------------------------------------------------------------------------- -start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, +start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = @@ -57,20 +58,20 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, User, VHost, Capabilities, - Collector, start_limiter_fun(SupPid)]}, + [Channel, ReaderPid, WriterPid, Protocol, User, VHost, + Capabilities, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, User, VHost, Capabilities, - Collector}) -> +start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost, + Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, User, - VHost, Capabilities, Collector, + [Channel, ClientChannelPid, ClientChannelPid, Protocol, + User, VHost, Capabilities, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 5c89bf49..586563f6 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/4, start_channel/6]). +-export([boot/0, connect/4, start_channel/7]). -include("rabbit.hrl"). @@ -28,10 +28,10 @@ -spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). --spec(start_channel/6 :: - (rabbit_channel:channel_number(), pid(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> - {'ok', pid()}). +-spec(start_channel/7 :: + (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid()) -> {'ok', pid()}). -endif. @@ -69,10 +69,11 @@ connect(Username, Password, VHost, Protocol) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, User, VHost, Capabilities, Collector) -> +start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities, + Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, User, VHost, Capabilities, - Collector}]), + [{direct, Number, ClientChannelPid, Protocol, User, VHost, + Capabilities, Collector}]), {ok, ChannelPid}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index be5a90af..c5d6ecc4 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -57,92 +57,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). -%% connection lifecycle -%% -%% all state transitions and terminations are marked with *...* -%% -%% The lifecycle begins with: start handshake_timeout timer, *pre-init* -%% -%% all states, unless specified otherwise: -%% socket error -> *exit* -%% socket close -> *throw* -%% writer send failure -> *throw* -%% forced termination -> *exit* -%% handshake_timeout -> *throw* -%% pre-init: -%% receive protocol header -> send connection.start, *starting* -%% starting: -%% receive connection.start_ok -> *securing* -%% securing: -%% check authentication credentials -%% if authentication success -> send connection.tune, *tuning* -%% if more challenge needed -> send connection.secure, -%% receive connection.secure_ok *securing* -%% otherwise send close, *exit* -%% tuning: -%% receive connection.tune_ok -> start heartbeats, *opening* -%% opening: -%% receive connection.open -> send connection.open_ok, *running* -%% running: -%% receive connection.close -> -%% tell channels to terminate gracefully -%% if no channels then send connection.close_ok, start -%% terminate_connection timer, *closed* -%% else *closing* -%% forced termination -%% -> wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *exit* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, mark channel as closing, *running* -%% handshake_timeout -> ignore, *running* -%% heartbeat timeout -> *throw* -%% conserve_memory=true -> *blocking* -%% blocking: -%% conserve_memory=true -> *blocking* -%% conserve_memory=false -> *running* -%% receive a method frame for a content-bearing method -%% -> process, stop receiving, *blocked* -%% ...rest same as 'running' -%% blocked: -%% conserve_memory=true -> *blocked* -%% conserve_memory=false -> resume receiving, *running* -%% ...rest same as 'running' -%% closing: -%% socket close -> *terminate* -%% receive connection.close -> send connection.close_ok, -%% *closing* -%% receive frame -> ignore, *closing* -%% handshake_timeout -> ignore, *closing* -%% heartbeat timeout -> *throw* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, mark channel as closing -%% if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% else *closing* -%% channel exits normally -%% -> if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% closed: -%% socket close -> *terminate* -%% receive connection.close -> send connection.close_ok, -%% *closed* -%% receive connection.close_ok -> self() ! terminate_connection, -%% *closed* -%% receive frame -> ignore, *closed* -%% terminate_connection timeout -> *terminate* -%% handshake_timeout -> ignore, *closed* -%% heartbeat timeout -> *throw* -%% channel exit -> log error, *closed* -%% -%% -%% TODO: refactor the code so that the above is obvious - -define(IS_RUNNING(State), (State#v1.connection_state =:= running orelse State#v1.connection_state =:= blocking orelse @@ -338,6 +252,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({inet_error, Reason}); {conserve_memory, Conserve} -> mainloop(Deb, internal_conserve_memory(Conserve, State)); + {channel_closing, ChPid} -> + ok = rabbit_channel:ready_for_close(ChPid), + channel_cleanup(ChPid), + mainloop(Deb, State); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -445,32 +363,32 @@ close_connection(State = #v1{queue_collector = Collector, erlang:send_after(TimeoutMillisec, self(), terminate_connection), State#v1{connection_state = closed}. -close_channel(Channel, State) -> - put({channel, Channel}, closing), - State. - handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - erase({ch_pid, ChPid}), + channel_cleanup(ChPid), maybe_close(State); uncontrolled -> case channel_cleanup(ChPid) of undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> maybe_close( + Channel -> rabbit_log:error( + "connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), + maybe_close( handle_exception(State, Channel, Reason)) end end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of - undefined -> undefined; - Channel -> erase({channel, Channel}), - erase({ch_pid, ChPid}), - Channel + undefined -> undefined; + {Channel, MRef} -> erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + Channel end. -all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. +all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. terminate_channels() -> NChannels = @@ -525,8 +443,8 @@ maybe_close(State = #v1{connection_state = closing, maybe_close(State) -> State. -termination_kind(normal) -> controlled; -termination_kind(_) -> uncontrolled. +termination_kind(normal) -> controlled; +termination_kind(_) -> uncontrolled. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, @@ -562,8 +480,8 @@ handle_frame(Type, Channel, Payload, Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), case AnalyzedFrame of - {method, 'channel.close', _} -> - erase({channel, Channel}), + {method, 'channel.close_ok', _} -> + channel_cleanup(ChPid), State; {method, MethodName, _} -> case (State#v1.connection_state =:= blocking @@ -575,25 +493,6 @@ handle_frame(Type, Channel, Payload, _ -> State end; - closing -> - %% According to the spec, after sending a - %% channel.close we must ignore all frames except - %% channel.close and channel.close_ok. In the - %% event of a channel.close, we should send back a - %% channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erase({channel, Channel}); - {method, 'channel.close', _} -> - %% We're already closing this channel, so - %% there's no cleanup to do (notify - %% queues, etc.) - ok = rabbit_writer:internal_send_command( - State#v1.sock, Channel, - #'channel.close_ok'{}, Protocol); - _ -> ok - end, - State; undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -970,13 +869,13 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> capabilities = Capabilities}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), - erlang:monitor(process, ChPid), + MRef = erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), - put({ch_pid, ChPid}, Channel), + put({ch_pid, ChPid}, {Channel, MRef}), State. process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> @@ -992,29 +891,20 @@ process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> AState end. -log_channel_error(ConnectionState, Channel, Reason) -> - rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", - [self(), ConnectionState, Channel, Reason]). - -handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> - log_channel_error(closed, Channel, Reason), +handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> State; -handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> - log_channel_error(CS, Channel, Reason), +handle_exception(State, Channel, Reason) -> send_exception(State, Channel, Reason). send_exception(State = #v1{connection = #connection{protocol = Protocol}}, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = + {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - NewState = case ShouldClose of - true -> terminate_channels(), - close_connection(State); - false -> close_channel(Channel, State) - end, + terminate_channels(), + State1 = close_connection(State), ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod, Protocol), - NewState. + State1#v1.sock, 0, CloseMethod, Protocol), + State1. internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2015170a..b5073f90 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1019,9 +1019,9 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - user(<<"user">>), <<"/">>, [], self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + 1, self(), Writer, rabbit_framing_amqp_0_9_1, user(<<"user">>), + <<"/">>, [], self(), fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1079,9 +1079,9 @@ test_server_status() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, user(<<"guest">>), - <<"/">>, [], self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + 1, Me, Writer, rabbit_framing_amqp_0_9_1, user(<<"guest">>), + <<"/">>, [], self(), fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) @@ -1305,7 +1305,7 @@ test_queue_cleanup(_SecondaryNode) -> rabbit_channel:do(Ch, #'queue.declare'{ passive = true, queue = ?CLEANUP_QUEUE_NAME }), receive - {channel_exit, 1, {amqp_error, not_found, _, _}} -> + #'channel.close'{reply_code = 404} -> ok after 2000 -> throw(failed_to_receive_channel_exit) |