diff options
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r-- | src/rabbit_reader.erl | 52 |
1 files changed, 33 insertions, 19 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 8553e36d..9ffcd203 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,7 +18,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1, +-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -77,7 +77,7 @@ -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). --spec(force_event_refresh/1 :: (pid()) -> 'ok'). +-spec(force_event_refresh/2 :: (pid(), reference()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -134,8 +134,8 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. -force_event_refresh(Pid) -> - gen_server:cast(Pid, force_event_refresh). +force_event_refresh(Pid, Ref) -> + gen_server:cast(Pid, {force_event_refresh, Ref}). conserve_resources(Pid, Source, Conserve) -> Pid ! {conserve_resources, Source, Conserve}, @@ -156,19 +156,23 @@ server_properties(Protocol) -> [case X of {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), longstr, - list_to_binary(Value)}; + maybe_list_to_binary(Value)}; {BinKey, Type, Value} -> {BinKey, Type, Value} end || X <- RawConfigServerProps ++ - [{product, Product}, - {version, Version}, - {platform, "Erlang/OTP"}, - {copyright, ?COPYRIGHT_MESSAGE}, - {information, ?INFORMATION_MESSAGE}]]], + [{product, Product}, + {version, Version}, + {cluster_name, rabbit_nodes:cluster_name()}, + {platform, "Erlang/OTP"}, + {copyright, ?COPYRIGHT_MESSAGE}, + {information, ?INFORMATION_MESSAGE}]]], %% Filter duplicated properties in favour of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). +maybe_list_to_binary(V) when is_binary(V) -> V; +maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V). + server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, @@ -285,8 +289,11 @@ recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> throw({become, F(Deb, Buf, BufLen, State)}); recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> - ok = rabbit_net:setopts(Sock, [{active, once}]), - mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}); + case rabbit_net:setopts(Sock, [{active, once}]) of + ok -> mainloop(Deb, Buf, BufLen, + State#v1{pending_recv = true}); + {error, Reason} -> stop(Reason, State) + end; recvloop(Deb, [B], _BufLen, State) -> {Rest, State1} = handle_input(State#v1.callback, B, State), recvloop(Deb, [Rest], size(Rest), State1); @@ -312,11 +319,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> closed when State#v1.connection_state =:= closed -> ok; closed -> - maybe_emit_stats(State), - throw(connection_closed_abruptly); + stop(closed, State); {error, Reason} -> - maybe_emit_stats(State), - throw({inet_error, Reason}); + stop(Reason, State); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, ?MODULE, Deb, {Buf, BufLen, State}); @@ -327,6 +332,11 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> end end. +stop(closed, State) -> maybe_emit_stats(State), + throw(connection_closed_abruptly); +stop(Reason, State) -> maybe_emit_stats(State), + throw({inet_error, Reason}). + handle_other({conserve_resources, Source, Conserve}, State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> @@ -389,10 +399,11 @@ handle_other({'$gen_call', From, {info, Items}}, State) -> catch Error -> {error, Error} end), State; -handle_other({'$gen_cast', force_event_refresh}, State) +handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) when ?IS_RUNNING(State) -> - rabbit_event:notify(connection_created, - [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), + rabbit_event:notify( + connection_created, + [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), State; handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. @@ -951,6 +962,9 @@ validate_negotiated_integer_value(Field, Min, ClientValue) -> ok end. +%% keep dialyzer happy +-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) -> + no_return(). fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> {S1, S2} = case MinOrMax of min -> {lower, minimum}; |