diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-07-16 11:15:15 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-07-16 11:15:15 +0100 |
commit | 079188669234c2522b61284bee46574410a5f158 (patch) | |
tree | 1ce52cc7a711ae0ecfc4b760ded640cef3c99fad | |
parent | 82b008482ae58b0f10af773b473f1ebd19e818b5 (diff) | |
parent | b78427551f4a11ee1e83062711629ced04afdfbc (diff) | |
download | rabbitmq-server-079188669234c2522b61284bee46574410a5f158.tar.gz |
merge default
-rw-r--r-- | Makefile | 2 | ||||
-rwxr-xr-x | packaging/macports/make-port-diff.sh | 6 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 10 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 52 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 13 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 7 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 4 | ||||
-rw-r--r-- | src/rabbit_prelaunch.erl | 6 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 135 |
10 files changed, 124 insertions, 124 deletions
@@ -216,7 +216,7 @@ run-qc: all start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid mkdir -p $(RABBITMQ_MNESIA_DIR) - setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" & + nohup sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" > /dev/null & ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid kernel start-rabbit-on-node: all diff --git a/packaging/macports/make-port-diff.sh b/packaging/macports/make-port-diff.sh index 3eb1b9f5..ac3afa4e 100755 --- a/packaging/macports/make-port-diff.sh +++ b/packaging/macports/make-port-diff.sh @@ -14,8 +14,10 @@ mkdir -p $dir/macports $dir/rabbitmq cd $dir/macports svn checkout http://svn.macports.org/repository/macports/trunk/dports/net/rabbitmq-server/ 2>&1 >/dev/null -# Clear out the svn $id tag -sed -i -e 's|^# \$.*$|# $Id$|' rabbitmq-server/Portfile +# Clear out the svn $id tag from the Portfile (and avoid using -i) +portfile=rabbitmq-server/Portfile +sed -e 's|^# \$.*$|# $Id$|' ${portfile} > ${portfile}.new +mv ${portfile}.new ${portfile} # Get the files from the rabbitmq.com macports repo cd ../rabbitmq diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index f3b4dbaf..13ee4249 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -374,11 +374,11 @@ sync(Ref) -> end). needs_sync(Ref) -> - with_handles( - [Ref], - fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false; - ([_Handle]) -> true - end). + %% This must *not* use with_handles/2; see bug 25052 + case get({Ref, fhc_handle}) of + #handle { is_dirty = false, write_buffer = [] } -> false; + #handle {} -> true + end. position(Ref, NewOffset) -> with_flushed_handles( diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 22c6a223..864e100a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -267,7 +267,7 @@ handle_cast({method, Method, Content, Flow}, catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - send_exception(Reason#amqp_error{method = MethodName}, State); + handle_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -400,11 +400,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -send_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - conn_pid = ConnPid}) -> +handle_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid}) -> {CloseChannel, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", @@ -418,6 +418,11 @@ send_exception(Reason, State = #ch{protocol = Protocol, {stop, normal, State1} end. +precondition_failed(Format) -> precondition_failed(Format, []). + +precondition_failed(Format, Params) -> + rabbit_misc:protocol_error(precondition_failed, Format, Params). + return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait, @@ -461,9 +466,9 @@ check_user_id_header(#'P_basic'{user_id = Username}, ok; check_user_id_header(#'P_basic'{user_id = Claimed}, #ch{user = #user{username = Actual}}) -> - rabbit_misc:protocol_error( - precondition_failed, "user_id property set to '~s' but " - "authenticated user was '~s'", [Claimed, Actual]). + precondition_failed( + "user_id property set to '~s' but authenticated user was '~s'", + [Claimed, Actual]). check_internal_exchange(#exchange{name = Name, internal = true}) -> rabbit_misc:protocol_error(access_refused, @@ -625,8 +630,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, State1#ch{uncommitted_message_q = NewTMQ} end}; {error, Reason} -> - rabbit_misc:protocol_error(precondition_failed, - "invalid message: ~p", [Reason]) + precondition_failed("invalid message: ~p", [Reason]) end; handle_method(#'basic.nack'{delivery_tag = DeliveryTag, @@ -881,8 +885,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, {error, not_found} -> rabbit_misc:not_found(ExchangeName); {error, in_use} -> - rabbit_misc:protocol_error( - precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]); + precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, #'exchange.delete_ok'{}) end; @@ -980,11 +983,9 @@ handle_method(#'queue.delete'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> - rabbit_misc:protocol_error( - precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); + precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); {error, not_empty} -> - rabbit_misc:protocol_error( - precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); + precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}) @@ -1019,15 +1020,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin, #'queue.purge_ok'{message_count = PurgedMessageCount}); handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot switch from confirm to tx mode", []); + precondition_failed("cannot switch from confirm to tx mode"); handle_method(#'tx.select'{}, _, State) -> {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}}; handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> - rabbit_misc:protocol_error( - precondition_failed, "channel is not transactional", []); + precondition_failed("channel is not transactional"); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, @@ -1041,8 +1040,7 @@ handle_method(#'tx.commit'{}, _, {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> - rabbit_misc:protocol_error( - precondition_failed, "channel is not transactional", []); + precondition_failed("channel is not transactional"); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, uncommitted_acks = TAL, @@ -1052,8 +1050,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot switch from tx to confirm mode", []); + precondition_failed("cannot switch from tx to confirm mode"); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, @@ -1263,8 +1260,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> QTail, DeliveryTag, Multiple) end; {empty, _} -> - rabbit_misc:protocol_error( - precondition_failed, "unknown delivery tag ~w", [DeliveryTag]) + precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. ack(Acked, State) -> @@ -1423,7 +1419,7 @@ complete_tx(State = #ch{tx_status = committing}) -> ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), State#ch{tx_status = in_progress}; complete_tx(State = #ch{tx_status = failed}) -> - {noreply, State1} = send_exception( + {noreply, State1} = handle_exception( rabbit_misc:amqp_error( precondition_failed, "partial tx completion", [], 'tx.commit'), diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 3e058793..10debb0b 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -36,8 +36,6 @@ length_fun }). --define(ONE_SECOND, 1000). - -ifdef(use_specs). -spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined', @@ -325,7 +323,6 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> true = link(GM), GM end, - ensure_gm_heartbeat(), {ok, #state { q = Q, gm = GM1, monitors = pmon:new(), @@ -359,11 +356,6 @@ handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> handle_cast({delete_and_terminate, Reason}, State) -> {stop, Reason, State}. -handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> - gm:broadcast(GM, heartbeat), - ensure_gm_heartbeat(), - noreply(State); - handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, State = #state { monitors = Mons, death_fun = DeathFun }) -> @@ -399,7 +391,7 @@ members_changed([_CPid], _Births, []) -> members_changed([CPid], _Births, Deaths) -> ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). -handle_msg([_CPid], _From, heartbeat) -> +handle_msg([_CPid], _From, master_changed) -> ok; handle_msg([CPid], _From, request_length = Msg) -> ok = gen_server2:cast(CPid, Msg); @@ -420,6 +412,3 @@ noreply(State) -> reply(Reply, State) -> {reply, Reply, State, hibernate}. - -ensure_gm_heartbeat() -> - erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 03fafc3e..60d3e027 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -199,7 +199,12 @@ handle_call({gm_deaths, Deaths}, From, %% master has changed to not us. gen_server2:reply(From, ok), erlang:monitor(process, Pid), - ok = gm:broadcast(GM, heartbeat), + %% GM is lazy. So we know of the death of the + %% slave since it is a neighbour of ours, but + %% until a message is sent, not all members will + %% know. That might include the new master. So + %% broadcast a no-op message to wake everyone up. + ok = gm:broadcast(GM, master_changed), noreply(State #state { master_pid = Pid }) end end; @@ -341,7 +346,7 @@ members_changed([_SPid], _Births, []) -> members_changed([SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths). -handle_msg([_SPid], _From, heartbeat) -> +handle_msg([_SPid], _From, master_changed) -> ok; handle_msg([_SPid], _From, request_length) -> %% This is only of value to the master @@ -452,7 +457,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, rabbit_mirror_queue_master:length_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), - ok = gm:confirmed_broadcast(GM, heartbeat), + %% TODO this has been in here since the beginning, but it's not + %% obvious if it is needed. Investigate... + ok = gm:confirmed_broadcast(GM, master_changed), %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d41aa09b..1fefa688 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -36,7 +36,7 @@ -export([execute_mnesia_transaction/2]). -export([execute_mnesia_tx_with_tail/1]). -export([ensure_ok/2]). --export([tcp_name/3]). +-export([tcp_name/3, format_inet_error/1]). -export([upmap/2, map_in_order/2]). -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). @@ -152,6 +152,7 @@ -spec(tcp_name/3 :: (atom(), inet:ip_address(), rabbit_networking:ip_port()) -> atom()). +-spec(format_inet_error/1 :: (atom()) -> string()). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'), @@ -510,6 +511,10 @@ tcp_name(Prefix, IPAddress, Port) list_to_atom( format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])). +format_inet_error(address) -> "cannot connect to host/port"; +format_inet_error(timeout) -> "timed out"; +format_inet_error(Error) -> inet:format_error(Error). + %% This is a modified version of Luke Gorrie's pmap - %% http://lukego.livejournal.com/6753.html - that doesn't care about %% the order in which results are received. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 1c23632d..c8d77b0f 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -70,8 +70,8 @@ diagnostics0() -> diagnostics_host(Host) -> case names(Host) of {error, EpmdReason} -> - {"- unable to connect to epmd on ~s: ~w", - [Host, EpmdReason]}; + {"- unable to connect to epmd on ~s: ~w (~s)", + [Host, EpmdReason, rabbit_misc:format_inet_error(EpmdReason)]}; {ok, NamePorts} -> {"- ~s: ~p", [Host, [{list_to_atom(Name), Port} || diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index d56211b5..b0454435 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -67,9 +67,5 @@ duplicate_node_check(NodeStr) -> {error, EpmdReason} -> rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n", [NodeHost, EpmdReason, - case EpmdReason of - address -> "unable to establish tcp connection"; - timeout -> "timed out establishing tcp connection"; - _ -> inet:format_error(EpmdReason) - end]) + rabbit_misc:format_inet_error(EpmdReason)]) end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index bd5cf588..75246007 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -173,6 +173,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> server_capabilities(_) -> []. +%%-------------------------------------------------------------------------- + log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -383,6 +385,9 @@ update_last_blocked_by(State = #v1{conserve_resources = true}) -> update_last_blocked_by(State = #v1{conserve_resources = false}) -> State#v1{last_blocked_by = flow}. +%%-------------------------------------------------------------------------- +%% error handling / termination + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -412,18 +417,6 @@ handle_dependent_exit(ChPid, Reason, State) -> Channel, Reason)) end. -channel_cleanup(ChPid) -> - case get({ch_pid, ChPid}) of - undefined -> undefined; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - Channel - end. - -all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. - terminate_channels() -> NChannels = length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), @@ -477,6 +470,53 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. +handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> + State; +handle_exception(State, Channel, Reason) -> + send_exception(State, Channel, Reason). + +send_exception(State = #v1{connection = #connection{protocol = Protocol}}, + Channel, Reason) -> + {0, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + terminate_channels(), + State1 = close_connection(State), + ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), + State1. + +%%-------------------------------------------------------------------------- + +create_channel(Channel, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = User, + vhost = VHost, + capabilities = Capabilities}} = State, + {ok, _ChSupPid, {ChPid, AState}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock), + Protocol, User, VHost, Capabilities, Collector}), + MRef = erlang:monitor(process, ChPid), + put({ch_pid, ChPid}, {Channel, MRef}), + put({channel, Channel}, {ChPid, AState}), + ok. + +channel_cleanup(ChPid) -> + case get({ch_pid, ChPid}) of + undefined -> undefined; + {Channel, MRef} -> credit_flow:peer_down(ChPid), + erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + Channel + end. + +all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. + +%%-------------------------------------------------------------------------- + handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) @@ -522,6 +562,17 @@ process_frame(Frame, Channel, State) -> Channel, State#v1.connection_state, Frame}) end. +process_channel_frame(Frame, ChPid, AState) -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> {ok, NewAState}; + {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), + {ok, NewAState}; + {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( + ChPid, Method, Content), + {ok, NewAState}; + {error, Reason} -> {error, Reason} + end. + post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), control_throttle(State); @@ -536,13 +587,15 @@ post_process_frame({method, MethodName, _}, _ChPid, post_process_frame(_Frame, _ChPid, State) -> control_throttle(State). +%%-------------------------------------------------------------------------- + handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1)); -handle_input({frame_payload, Type, Channel, PayloadSize}, - PayloadAndMarker, State) -> +handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, + State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> switch_callback(handle_frame(Type, Channel, Payload, State), @@ -834,8 +887,8 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> - socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end, - fun ([{_, I}]) -> I end); + socket_info(fun (S) -> rabbit_net:getstat(S, [SockStat]) end, + fun ([{_, I}]) -> I end, Sock); i(state, #v1{connection_state = S}) -> S; i(last_blocked_by, #v1{last_blocked_by = By}) -> @@ -871,10 +924,7 @@ i(Item, #v1{}) -> throw({bad_argument, Item}). socket_info(Get, Select, Sock) -> - socket_info(fun() -> Get(Sock) end, Select). - -socket_info(Get, Select) -> - case Get() of + case Get(Sock) of {ok, T} -> Select(T); {error, _} -> '' end. @@ -897,51 +947,6 @@ cert_info(F, Sock) -> {ok, Cert} -> list_to_binary(F(Cert)) end. -%%-------------------------------------------------------------------------- - -create_channel(Channel, State) -> - #v1{sock = Sock, queue_collector = Collector, - channel_sup_sup_pid = ChanSupSup, - connection = #connection{protocol = Protocol, - frame_max = FrameMax, - user = User, - vhost = VHost, - capabilities = Capabilities}} = State, - {ok, _ChSupPid, {ChPid, AState}} = - rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock), - Protocol, User, VHost, Capabilities, Collector}), - MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - ok. - -process_channel_frame(Frame, ChPid, AState) -> - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> {ok, NewAState}; - {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - {ok, NewAState}; - {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( - ChPid, Method, Content), - {ok, NewAState}; - {error, Reason} -> {error, Reason} - end. - -handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> - State; -handle_exception(State, Channel, Reason) -> - send_exception(State, Channel, Reason). - -send_exception(State = #v1{connection = #connection{protocol = Protocol}}, - Channel, Reason) -> - {0, CloseMethod} = - rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - terminate_channels(), - State1 = close_connection(State), - ok = rabbit_writer:internal_send_command( - State1#v1.sock, 0, CloseMethod, Protocol), - State1. - emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). |