diff options
-rw-r--r-- | lib/diameter/src/base/diameter_dist.erl | 293 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 13 |
2 files changed, 264 insertions, 42 deletions
diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl index ed2859e914..5c29ea95a4 100644 --- a/lib/diameter/src/base/diameter_dist.erl +++ b/lib/diameter/src/base/diameter_dist.erl @@ -28,12 +28,20 @@ %% requests to handler processes (local or remote) in various ways. %% -%% spawn_opt callbacks; initial argument constructed in diameter_traffic +%% spawn_opt callbacks -export([spawn_local/2, spawn_local/1, route_session/2, route_session/1]). +%% signal availability for handling incoming requests to route_sesssion/2 +-export([attach/1, + detach/1]). + +%% consistent hashing +-export([hash/3, %% for use as default MFA in route_session/2 options map + hash/2]). %% arbitrary key/values + -include_lib("diameter/include/diameter.hrl"). %% server start @@ -50,9 +58,21 @@ -type request() :: tuple(). %% callback argument from diameter_traffic -define(SERVER, ?MODULE). %% server monitoring node connections --define(TABLE, ?MODULE). %% node() binary -> node() atom + +%% Maps a node name binary to the corresponding atom. Used by +%% route_session/2 to map the optional value of a Session-Id to +%% node(). +-define(NODE_TABLE, diameter_dist_node). + +%% Maps a diameter:service_name() to a node() that has called attach/1 +%% to declare its willingness to handle incoming requests for the +%% service. Use by route_session/2 in case the optional value mapping +%% has failed. +-define(SERVICE_TABLE, diameter_dist_service). -define(B(A), atom_to_binary(A, utf8)). +-define(ORCOND(List), list_to_tuple(['orelse', false | List])). +-define(HASH(T), erlang:phash2(T, 16#100000000)). %% spawn_local/2 %% @@ -76,32 +96,82 @@ spawn_local(ReqT) -> %% route_session/2 %% -%% Callback that routes requests containing Session-Id AVPs as -%% returned by diameter:session_id/0 back to the node on which the -%% function was called. This is only appropriate when sessions are -%% only initiated by the own (typically client) node, and ids have -%% been returned from diameter:session_id/0. +%% Callback that maps the Session-Id of an incoming request to a +%% handler node. %% -%% This can be used with #{search => 0} to route on something other -%% than Session-Id since default can be an MFA returning a node() -%% (applied to the incoming diameter_packet record) and dispatch can -%% be an MFA returning a pid() (applied to Node and the request MFA), -%% but this is no simpler than just implementing an own spawn_opt -%% callback. (Except with the default dispatch possibly.) - +%% With an options list, maps an id whose optional value is the name +%% of a connected node to the same node, to handle the case that the +%% session id has been returned from diameter:session_id/1; otherwise +%% to a node that has called diameter_dist:attach/1 using the +%% consistent hashing provided by hash/3, or to the local node() if a +%% session id could not be extracted or there are no attached nodes. A +%% handler process is spawned on the selected node using +%% erlang:spawn_opt/4. +%% +%% Different behaviour can be configured by supplying an options map +%% of the following form: +%% +%% #{search => non_neg_integer(), +%% id => [binary()], +%% default => discard | local | mfa(), +%% dispatch => list() | mfa()} +%% +%% The search member limits the number of AVPs that are examined in +%% the message (from the front), to avoid searching entire message in +%% case it's known that peers follow RFC 6733's recommendation that +%% Session-Id be placed at the head of a message. The default is to +%% search the entire message. +%% +%% The id member restricts the optional value mapping to session ids +%% whose DiamterIdentity is one of those specified. Set this to the +%% list of Diameter identities advertised by the service in question +%% (typically one) to ensure that only locally generated session ids +%% are mapped; or to the empty list to disable the mapping. +%% +%% The default member determines where to handle a message whose +%% Session-Id isn't found or whose optional value isn't mapped to the +%% name of a connected node. The atom local says the local node, an +%% MFA is invoked on Session-Id | false, the name of the diameter +%% service, and the message binary, and should return either a node() +%% or false to discard the message. Defaults to {diameter_dist, hash, []}. +%% +%% The dispatch member determines how the pid() of the request handler +%% process is retrieved. An MFA is applied to a previously selected +%% node(), and the module, function, and arguments list to apply in +%% the handler process to handle the request, the MFA being supplied +%% by diameter, and returns pid() | discard. A list is equivalent to +%% {erlang, spawn_opt, []}. Defaults to []. +%% +%% This can be used with search = 0 to route on something other than +%% Session-Id, but this is probably no simpler than just implementing +%% an own spawn_opt callback. (Except with the default dispatch possibly.) +%% +%% Note that if the peer is also implemented with OTP diameter and +%% generating session ids with diameter:session_id/1 then +%% route_session/2 can map an optional value to a local node that +%% happens to have the same name as one of the peer's nodes. This +%% could lead to an uneven distribution; for example, if the peer +%% nodes are a subset of the local nodes. In practice, it's typically +%% known if it's peers or the local node originating sessions; if the +%% former then setting id = [] disables the optional value mapping, if +%% the latter then setting default = local disables the hashing. -spec route_session(ReqT :: request(), Opts) -> discard | pid() when Opts :: pos_integer() %% aka #{search => N} | list() %% aka #{dispatch => Opts} | #{search => non_neg_integer(), %% limit number of examined AVPs - default => discard | mfa(), %% return node() | false - dispatch => list() | mfa()}. %% spawn options or return pid() + id => [binary()], %% restrict optional value map on DiamIdent + default => local %% handle locally + | discard + | mfa(), %% return node() | false + dispatch => list() %% spawn options + | mfa()}. %% (Node, M, F, A) -> pid() | discard route_session(ReqT, Opts) -> - #diameter_packet{bin = Bin} = Pkt = element(1, ReqT), + {_, Bin} = Info = diameter_traffic:request_info(ReqT), Sid = session_id(avps(Bin), search(Opts)), - Node = default(node_of_session_id(Sid), Sid, Opts, Pkt), + Node = default(node_of_session_id(Sid, Opts), Sid, Opts, Info), dispatch(Node, ReqT, dispatch(Opts)). %% avps/1 @@ -128,24 +198,31 @@ dispatch(Node, ReqT, Opts) -> route_session(ReqT) -> route_session(ReqT, []). -%% node_of_session_id/1 +%% node_of_session_id/2 %% %% Return the node name encoded as optional value in a Session-Id, %% assuming the id has been created with diameter:session_id/0. Lookup %% the node name to ensure we don't convert arbitrary binaries to %% atom. -node_of_session_id([_, _, _, Bin]) -> - case ets:lookup(?TABLE, Bin) of - [{_, Node}] -> - Node; - [] -> - false - end; +node_of_session_id([Id, _, _, Bin], #{id := Ids}) -> + lists:member(Id, Ids) andalso nodemap(Bin); -node_of_session_id(_) -> +node_of_session_id([_, _, _, Bin], _) -> + nodemap(Bin); + +node_of_session_id(_, _) -> false. +%% nodemap/1 + +nodemap(Bin) -> + try + ets:lookup_element(?NODE_TABLE, Bin, 2) + catch + error: badarg -> false + end. + %% session_id/2 session_id(_, 0) -> %% give up @@ -154,7 +231,7 @@ session_id(_, 0) -> %% give up %% Session-Id = Command Code 263, V-bit = 0. session_id(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin, _) -> case Bin of - <<Avp:Len/binary>> -> + <<Avp:Len/binary, _/binary>> -> <<_:8/binary, Sid/binary>> = Avp, split(Sid); _ -> @@ -248,20 +325,123 @@ search(_) -> %% %% Choose a node when Session-Id lookup has failed. -default(false = No, _, #{default := discard}, _) -> - No; - -default(false, Sid, #{default := {M,F,A}}, Pkt) -> - apply(M, F, [Sid, Pkt | A]); %% false | node() +default(false, _, #{default := discard}, _) -> + false; -default(false, _, _, _) -> +default(false, _, #{default := local}, _) -> node(); +default(false, Sid, #{default := {M,F,A}}, Info) -> + {ServiceName, Bin} = Info, + apply(M, F, [Sid, ServiceName, Bin | A]); %% node() | false + +default(false, Sid, _, Info) -> %% aka {?MODULE, hash, []} + {ServiceName, Bin} = Info, + hash(Sid, ServiceName, Bin); + default(Node, _, _, _) -> Node. %% =========================================================================== +%% hash/3 +%% +%% Consistent hashing of Session-Id to an attached node, or the local +%% node if Session-Id = false or no attached nodes. + +hash(Sid, ServiceName, _) -> + case false /= Sid andalso attached(ServiceName) of + [_|_] = Nodes -> + hash(Sid, Nodes); + _ -> + node() + end. + +%% hash/2 +%% +%% Consistent hashing on arbitrary key/values. Returns false if the +%% list is empty. + +%% No key or no values. +hash(_, []) -> + false; + +%% Not much choice. +hash(_, [Value]) -> + Value; + +%% Hash on a circle and choose the closest predecessor. +hash(Key, Values) -> + Hash = ?HASH(Key), + tl(lists:foldl(fun(V,A) -> + choose(Hash, [?HASH({Key, V}) | V], A) + end, + false, %% < list() + Values)). + +%% choose/3 + +choose(Hash, [Hash1 | _] = T, [Hash2 | _]) + when Hash1 =< Hash, Hash < Hash2 -> + T; + +choose(Hash, [Hash1 | _], [Hash2 | _] = T) + when Hash2 =< Hash, Hash < Hash1 -> + T; + +choose(_, T1, T2) -> + max(T1, T2). + +%% =========================================================================== + +%% attach/1 +%% +%% Register the local node as a handler of incoming requests for the +%% specified services when using the route_session/2 spawn_opt +%% callback. + +attach(ServiceNames) -> + abcast({attach, node(), ServiceNames}). + +%% detach/1 +%% +%% Deregister the local node as a handler of incoming requests. + +detach(ServiceNames) -> + abcast({detach, node(), ServiceNames}). + +%% abcast/1 + +abcast(T) -> + gen_server:abcast([node() | nodes()], ?SERVER, T), + ok. + +%% attached/1 + +attached(ServiceName) -> + try + ets:lookup_element(?SERVICE_TABLE, ServiceName, 2) + catch + error: badarg -> [] + end. + +%% cast/2 + +cast(Node, T) -> + gen_server:cast({?SERVER, Node}, T). + +%% attach/2 + +attach(Node, S) -> + case sets:to_list(S) of + [] -> + ok; + Services -> + cast(Node, {attach, node(), Services}) + end. + +%% =========================================================================== + start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, _Args = [], _Opts = []). @@ -272,11 +452,12 @@ start_link() -> %% binaries that aren't necessarily node names. init([]) -> - ets:new(?TABLE, [set, named_table]), + ets:new(?NODE_TABLE, [set, named_table]), + ets:new(?SERVICE_TABLE, [bag, named_table]), ok = net_kernel:monitor_nodes(true, [{node_type, all}, nodedown_reason]), - ets:insert(?TABLE, [{B,N} || N <- [node() | nodes()], - B <- [?B(N)]]), - {ok, erlang:monotonic_time()}. + ets:insert(?NODE_TABLE, [{?B(N), N} || N <- [node() | nodes()]]), + abcast({attach, node()}), + {ok, sets:new()}. %% handle_call/3 @@ -285,17 +466,49 @@ handle_call(_, _From, S) -> %% handle_cast/2 +%% Remote node is asking which services the local node wants to handle. +handle_cast({attach, Node}, S) + when Node /= node() -> + attach(Node, S), + {noreply, S}; + +%% Node wants to handle incoming requests ... +handle_cast({attach, Node, ServiceNames}, S) -> + ets:insert(?SERVICE_TABLE, [{N, Node} || N <- ServiceNames]), + {noreply, case node() of + Node -> + sets:union(S, sets:from_list(ServiceNames)); + _ -> + S + end}; + +%% ... or not. +handle_cast({detach, Node, ServiceNames}, S) -> + ets:select_delete(?SERVICE_TABLE, [{{'$1', Node}, + [?ORCOND([{'==', '$1', {const, N}} + || N <- ServiceNames])], + [true]}]), + {noreply, case node() of + Node -> + sets:subtract(S, sets:from_list(ServiceNames)); + _ -> + S + end}; + handle_cast(_, S) -> {noreply, S}. %% handle_info/2 handle_info({nodeup, Node, _}, S) -> - ets:insert(?TABLE, {?B(Node), Node}), + ets:insert(?NODE_TABLE, {?B(Node), Node}), + cast(Node, {attach, node()}), %% ask which services remote node handles + attach(Node, S), %% say which service local node handles {noreply, S}; handle_info({nodedown, Node, _}, S) -> - ets:delete(?TABLE, ?B(Node)), + ets:delete(?NODE_TABLE, ?B(Node)), + ets:select_delete(?SERVICE_TABLE, [{{'_', Node}, [], [true]}]), {noreply, S}; handle_info(_, S) -> diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index b1b797aad8..c0643402a6 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -42,6 +42,9 @@ peer_up/1, peer_down/1]). +%% towards diameter_dist +-export([request_info/1]). + %% internal -export([send/1, %% send from remote node request/1, %% process request in handler process @@ -289,8 +292,7 @@ spawn_request(false, _, _, _, _, _, _) -> %% no transport %% handler process dies (in a handle_request callback for example). spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) -> %% Term to pass to request/1 in an appropriate process. Module - %% diameter_dist implements callbacks, and uses the form of the - %% argument tuple constructed below. + %% diameter_dist implements callbacks. ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData}, apply(M, F, [ReqT | A]); @@ -302,6 +304,13 @@ spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData) -> end, Opts). +%% request_info/1 +%% +%% Limited request information for diameter_dist. + +request_info({Pkt, _AppT, _Ack, _TPid, _Dict0, RecvData} = _ReqT) -> + {RecvData#recvdata.service_name, Pkt#diameter_packet.bin}. + %% request/1 %% %% Called from a handler process chosen by a transport spawn_opt MFA |