diff options
author | Anders Svensson <anders@erlang.org> | 2017-06-10 23:40:53 +0200 |
---|---|---|
committer | Anders Svensson <anders@erlang.org> | 2017-06-11 16:30:39 +0200 |
commit | eadf4efc7e264fe8dd30befb42a42a02cdef58f1 (patch) | |
tree | 6ac266589946ff35f843cbf89d6913113cbb1629 /lib/diameter | |
parent | 034089ed1ba3f78c732edcfc84d85a6ed4a4854e (diff) | |
download | erlang-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.tar.gz |
Make diameter_{tcp,sctp} sender configurable
With sends still from the receiving process by default, since changing
the default behaviour may well have negative effects. A separate sender
probably implies a greater need for some form of load regulation for
one, since a blocking send would no longer imply that incoming messages
are no longer recevied. Dealing with this could result in the same
deadlock that the sending process intends to avoid, but the user should
be in control over how/when incoming traffic is regulated.
Diffstat (limited to 'lib/diameter')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 59 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 81 |
2 files changed, 90 insertions, 50 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index d23e56b413..3919596cb1 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -68,6 +68,7 @@ -type connect_option() :: {raddr, inet:ip_address()} | {rport, inet:port_number()} + | option() | term(). %% gen_sctp:open_option(). -type match() :: inet:ip_address() @@ -75,8 +76,12 @@ | [match()]. -type listen_option() :: {accept, match()} + | option() | term(). %% gen_sctp:open_option(). +-type option() :: {sender, boolean()} + | sender. + -type uint() :: non_neg_integer(). %% Accepting/connecting transport process state. @@ -96,7 +101,7 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream - monitor :: pid() | undefined}). %% sending process + send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -record(monitor, @@ -110,7 +115,8 @@ socket :: gen_sctp:sctp_socket(), service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()]}). + accept :: [match()], + sender :: boolean()}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -236,7 +242,8 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {[Matches], Rest} = proplists:split(Opts, [accept]), + {Split, Rest} = proplists:split(Opts, [accept, sender]), + OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), @@ -244,12 +251,14 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- Matches]}; + accept = [[M] || {accept, M} <- OwnOpts], + sender = proplists:get_value(sender, OwnOpts, false)}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), - RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], + {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]), + OwnOpts = lists:append(Split), + RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), @@ -257,7 +266,8 @@ i({connect, Pid, Opts, Addrs, Ref}) -> monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, - socket = Sock}; + socket = Sock, + send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an %% association. @@ -291,11 +301,12 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches} when K == peeloff -> %% association + {K, T, Matches, Bool} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), - t(T, S#transport{socket = Sock}); + t(T, S#transport{socket = Sock, + send = Bool}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -466,11 +477,12 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches} + accept = Matches, + sender = Sender} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender}, setopts(Sock), NewS; @@ -546,14 +558,15 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> %% Parent process has died: call the monitor to not close the socket %% during an ongoing send, but don't let it take forever. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, - monitor = MPid}) -> - undefined == MPid + send = MPid}) -> + is_boolean(MPid) orelse ok == (catch gen_server:call(MPid, {stop, Pid})) orelse exit(MPid, kill), stop; %% Monitor process has died. -transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop; %% Timeout after transport process has been started. @@ -617,7 +630,7 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 %% Start monitor process on first send. -send(Msg, #transport{monitor = undefined, +send(Msg, #transport{send = true, socket = Sock, assoc_id = AId} = S) -> @@ -625,7 +638,7 @@ send(Msg, #transport{monitor = undefined, socket = Sock, assoc_id = AId}), monitor(process, MPid), - send(Msg, S#transport{monitor = MPid}); + send(Msg, S#transport{send = MPid}); %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, @@ -646,13 +659,23 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{monitor = MPid}) -> +send(StreamId, Bin, #transport{send = false, + socket = Sock, + assoc_id = AId}) -> + send(Sock, AId, StreamId, Bin); + +send(StreamId, Bin, #transport{send = MPid}) -> MPid ! {Bin, StreamId}, MPid; send(StreamId, Bin, #monitor{socket = Sock, assoc_id = AId}) -> - case gen_sctp:send(Sock, AId, StreamId, Bin) of + send(Sock, AId, StreamId, Bin). + +%% send/4 + +send(Sock, AssocId, StreamId, Bin) -> + case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> ok; {error, Reason} -> diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 427b2395b9..edbbec1709 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -69,16 +69,16 @@ %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and %% a process owning the listening port. The monitor process -%% historically died after connection establishment, but now lives on -%% as the sender of outgoing messages, so that a blocking send doesn't -%% prevent messages from being received. +%% historically died after connection establishment, but can now live +%% on as the sender of outgoing messages, so that a blocking send +%% doesn't prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), module :: module(), service = false :: false | pid()}). %% service process -%% Monitor process state. The name monitor predates its role as sender. +%% Monitor process state. -record(monitor, {parent :: reference() | false | pid(), transport = self() :: pid(), @@ -108,6 +108,7 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} + | {sender, boolean()} | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. @@ -120,7 +121,7 @@ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - monitor :: pid()}). %% monitor/sender process + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -210,25 +211,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) %% that kills us with the parent until call returns, and then %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, + sender, fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), + Sender = proplists:get_value(sender, OwnOpts, false), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, - monitor(process, MPid), - MPid ! {start, self(), Sock, M}, %% prepare monitor for sending + Sender andalso monitor(process, MPid), + MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending putr(?REF_KEY, Ref), setopts(#transport{parent = Pid, module = M, socket = Sock, ssl = SslOpts, timeout = Tmo, - monitor = MPid}); + send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -515,15 +518,22 @@ m(Bin, S) send(Bin, S), S; -%% Transport is telling us to be ready to send. Stop monitoring on the +%% Transport has established a connection. Stop monitoring on the %% parent so as not to die before a send from the transport. -m({start, TPid, Sock, Mod}, #monitor{parent = MRef, - transport = TPid} - = S) -> - demonitor(MRef, [flush]), - S#monitor{parent = false, - socket = Sock, - module = Mod}; +m({start, TPid, T} = M, #monitor{transport = TPid} = S) -> + case T of + {Sock, Mod} -> + demonitor(S#monitor.parent, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod}; + false -> %% monitor not sending + x(M) + end; + +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); %% Transport is telling us to die. m({stop, TPid} = T, #monitor{transport = TPid}) -> @@ -632,13 +642,15 @@ transition({resolve_port, Pid}, #transport{socket = Sock, %% Parent process has died: call the monitor to not close the socket %% during an ongoing send, but don't let it take forever. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, - monitor = MPid}) -> - ok == (catch gen_server:call(MPid, {stop, Pid})) - orelse exit(MPid, kill), + send = MPid}) -> + false == MPid + orelse (ok == gen_server:call(MPid, {stop, self()}, 1000)) + orelse exit(MPid, {shutdown, parent}), stop; %% Monitor process has died. -transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop. %% Crash on anything unexpected. @@ -663,12 +675,12 @@ tls_handshake(_, true, #transport{ssl = false}) -> tls_handshake(Type, true, #transport{socket = Sock, module = M, ssl = Opts, - monitor = MPid} + send = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), - MPid ! {tls, SSock}, %% tell the monitor process + false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process S#transport{socket = SSock, module = ssl}; @@ -827,29 +839,34 @@ connect(Mod, Host, Port, Opts) -> %% send/2 +send(false, #transport{}) -> %% ack + ok; + send(#diameter_packet{bin = Bin}, S) -> send(Bin, S); -send(Bin, #monitor{} = S) -> - send1(Bin, S); +send(Bin, #transport{socket = Sock, module = M, send = false}) -> + send1(M, Sock, Bin); + +send(Bin, #monitor{socket = Sock, module = M}) -> + send1(M, Sock, Bin); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{monitor = MPid}) -> - MPid ! Bin, - MPid. +send(Bin, #transport{send = Pid}) -> + Pid ! Bin, + ok. -%% send1/2 +%% send1/3 -send1(Bin, #monitor{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of +send1(Mod, Sock, Bin) -> + case send(Mod, Sock, Bin) of ok -> ok; {error, Reason} -> x({send, Reason}) end. - + %% send/3 send(gen_tcp, Sock, Bin) -> |