summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/base/diameter_dist.erl293
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl13
2 files changed, 264 insertions, 42 deletions
diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl
index ed2859e914..5c29ea95a4 100644
--- a/lib/diameter/src/base/diameter_dist.erl
+++ b/lib/diameter/src/base/diameter_dist.erl
@@ -28,12 +28,20 @@
%% requests to handler processes (local or remote) in various ways.
%%
-%% spawn_opt callbacks; initial argument constructed in diameter_traffic
+%% spawn_opt callbacks
-export([spawn_local/2,
spawn_local/1,
route_session/2,
route_session/1]).
+%% signal availability for handling incoming requests to route_sesssion/2
+-export([attach/1,
+ detach/1]).
+
+%% consistent hashing
+-export([hash/3, %% for use as default MFA in route_session/2 options map
+ hash/2]). %% arbitrary key/values
+
-include_lib("diameter/include/diameter.hrl").
%% server start
@@ -50,9 +58,21 @@
-type request() :: tuple(). %% callback argument from diameter_traffic
-define(SERVER, ?MODULE). %% server monitoring node connections
--define(TABLE, ?MODULE). %% node() binary -> node() atom
+
+%% Maps a node name binary to the corresponding atom. Used by
+%% route_session/2 to map the optional value of a Session-Id to
+%% node().
+-define(NODE_TABLE, diameter_dist_node).
+
+%% Maps a diameter:service_name() to a node() that has called attach/1
+%% to declare its willingness to handle incoming requests for the
+%% service. Use by route_session/2 in case the optional value mapping
+%% has failed.
+-define(SERVICE_TABLE, diameter_dist_service).
-define(B(A), atom_to_binary(A, utf8)).
+-define(ORCOND(List), list_to_tuple(['orelse', false | List])).
+-define(HASH(T), erlang:phash2(T, 16#100000000)).
%% spawn_local/2
%%
@@ -76,32 +96,82 @@ spawn_local(ReqT) ->
%% route_session/2
%%
-%% Callback that routes requests containing Session-Id AVPs as
-%% returned by diameter:session_id/0 back to the node on which the
-%% function was called. This is only appropriate when sessions are
-%% only initiated by the own (typically client) node, and ids have
-%% been returned from diameter:session_id/0.
+%% Callback that maps the Session-Id of an incoming request to a
+%% handler node.
%%
-%% This can be used with #{search => 0} to route on something other
-%% than Session-Id since default can be an MFA returning a node()
-%% (applied to the incoming diameter_packet record) and dispatch can
-%% be an MFA returning a pid() (applied to Node and the request MFA),
-%% but this is no simpler than just implementing an own spawn_opt
-%% callback. (Except with the default dispatch possibly.)
-
+%% With an options list, maps an id whose optional value is the name
+%% of a connected node to the same node, to handle the case that the
+%% session id has been returned from diameter:session_id/1; otherwise
+%% to a node that has called diameter_dist:attach/1 using the
+%% consistent hashing provided by hash/3, or to the local node() if a
+%% session id could not be extracted or there are no attached nodes. A
+%% handler process is spawned on the selected node using
+%% erlang:spawn_opt/4.
+%%
+%% Different behaviour can be configured by supplying an options map
+%% of the following form:
+%%
+%% #{search => non_neg_integer(),
+%% id => [binary()],
+%% default => discard | local | mfa(),
+%% dispatch => list() | mfa()}
+%%
+%% The search member limits the number of AVPs that are examined in
+%% the message (from the front), to avoid searching entire message in
+%% case it's known that peers follow RFC 6733's recommendation that
+%% Session-Id be placed at the head of a message. The default is to
+%% search the entire message.
+%%
+%% The id member restricts the optional value mapping to session ids
+%% whose DiamterIdentity is one of those specified. Set this to the
+%% list of Diameter identities advertised by the service in question
+%% (typically one) to ensure that only locally generated session ids
+%% are mapped; or to the empty list to disable the mapping.
+%%
+%% The default member determines where to handle a message whose
+%% Session-Id isn't found or whose optional value isn't mapped to the
+%% name of a connected node. The atom local says the local node, an
+%% MFA is invoked on Session-Id | false, the name of the diameter
+%% service, and the message binary, and should return either a node()
+%% or false to discard the message. Defaults to {diameter_dist, hash, []}.
+%%
+%% The dispatch member determines how the pid() of the request handler
+%% process is retrieved. An MFA is applied to a previously selected
+%% node(), and the module, function, and arguments list to apply in
+%% the handler process to handle the request, the MFA being supplied
+%% by diameter, and returns pid() | discard. A list is equivalent to
+%% {erlang, spawn_opt, []}. Defaults to [].
+%%
+%% This can be used with search = 0 to route on something other than
+%% Session-Id, but this is probably no simpler than just implementing
+%% an own spawn_opt callback. (Except with the default dispatch possibly.)
+%%
+%% Note that if the peer is also implemented with OTP diameter and
+%% generating session ids with diameter:session_id/1 then
+%% route_session/2 can map an optional value to a local node that
+%% happens to have the same name as one of the peer's nodes. This
+%% could lead to an uneven distribution; for example, if the peer
+%% nodes are a subset of the local nodes. In practice, it's typically
+%% known if it's peers or the local node originating sessions; if the
+%% former then setting id = [] disables the optional value mapping, if
+%% the latter then setting default = local disables the hashing.
-spec route_session(ReqT :: request(), Opts)
-> discard
| pid()
when Opts :: pos_integer() %% aka #{search => N}
| list() %% aka #{dispatch => Opts}
| #{search => non_neg_integer(), %% limit number of examined AVPs
- default => discard | mfa(), %% return node() | false
- dispatch => list() | mfa()}. %% spawn options or return pid()
+ id => [binary()], %% restrict optional value map on DiamIdent
+ default => local %% handle locally
+ | discard
+ | mfa(), %% return node() | false
+ dispatch => list() %% spawn options
+ | mfa()}. %% (Node, M, F, A) -> pid() | discard
route_session(ReqT, Opts) ->
- #diameter_packet{bin = Bin} = Pkt = element(1, ReqT),
+ {_, Bin} = Info = diameter_traffic:request_info(ReqT),
Sid = session_id(avps(Bin), search(Opts)),
- Node = default(node_of_session_id(Sid), Sid, Opts, Pkt),
+ Node = default(node_of_session_id(Sid, Opts), Sid, Opts, Info),
dispatch(Node, ReqT, dispatch(Opts)).
%% avps/1
@@ -128,24 +198,31 @@ dispatch(Node, ReqT, Opts) ->
route_session(ReqT) ->
route_session(ReqT, []).
-%% node_of_session_id/1
+%% node_of_session_id/2
%%
%% Return the node name encoded as optional value in a Session-Id,
%% assuming the id has been created with diameter:session_id/0. Lookup
%% the node name to ensure we don't convert arbitrary binaries to
%% atom.
-node_of_session_id([_, _, _, Bin]) ->
- case ets:lookup(?TABLE, Bin) of
- [{_, Node}] ->
- Node;
- [] ->
- false
- end;
+node_of_session_id([Id, _, _, Bin], #{id := Ids}) ->
+ lists:member(Id, Ids) andalso nodemap(Bin);
-node_of_session_id(_) ->
+node_of_session_id([_, _, _, Bin], _) ->
+ nodemap(Bin);
+
+node_of_session_id(_, _) ->
false.
+%% nodemap/1
+
+nodemap(Bin) ->
+ try
+ ets:lookup_element(?NODE_TABLE, Bin, 2)
+ catch
+ error: badarg -> false
+ end.
+
%% session_id/2
session_id(_, 0) -> %% give up
@@ -154,7 +231,7 @@ session_id(_, 0) -> %% give up
%% Session-Id = Command Code 263, V-bit = 0.
session_id(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin, _) ->
case Bin of
- <<Avp:Len/binary>> ->
+ <<Avp:Len/binary, _/binary>> ->
<<_:8/binary, Sid/binary>> = Avp,
split(Sid);
_ ->
@@ -248,20 +325,123 @@ search(_) ->
%%
%% Choose a node when Session-Id lookup has failed.
-default(false = No, _, #{default := discard}, _) ->
- No;
-
-default(false, Sid, #{default := {M,F,A}}, Pkt) ->
- apply(M, F, [Sid, Pkt | A]); %% false | node()
+default(false, _, #{default := discard}, _) ->
+ false;
-default(false, _, _, _) ->
+default(false, _, #{default := local}, _) ->
node();
+default(false, Sid, #{default := {M,F,A}}, Info) ->
+ {ServiceName, Bin} = Info,
+ apply(M, F, [Sid, ServiceName, Bin | A]); %% node() | false
+
+default(false, Sid, _, Info) -> %% aka {?MODULE, hash, []}
+ {ServiceName, Bin} = Info,
+ hash(Sid, ServiceName, Bin);
+
default(Node, _, _, _) ->
Node.
%% ===========================================================================
+%% hash/3
+%%
+%% Consistent hashing of Session-Id to an attached node, or the local
+%% node if Session-Id = false or no attached nodes.
+
+hash(Sid, ServiceName, _) ->
+ case false /= Sid andalso attached(ServiceName) of
+ [_|_] = Nodes ->
+ hash(Sid, Nodes);
+ _ ->
+ node()
+ end.
+
+%% hash/2
+%%
+%% Consistent hashing on arbitrary key/values. Returns false if the
+%% list is empty.
+
+%% No key or no values.
+hash(_, []) ->
+ false;
+
+%% Not much choice.
+hash(_, [Value]) ->
+ Value;
+
+%% Hash on a circle and choose the closest predecessor.
+hash(Key, Values) ->
+ Hash = ?HASH(Key),
+ tl(lists:foldl(fun(V,A) ->
+ choose(Hash, [?HASH({Key, V}) | V], A)
+ end,
+ false, %% < list()
+ Values)).
+
+%% choose/3
+
+choose(Hash, [Hash1 | _] = T, [Hash2 | _])
+ when Hash1 =< Hash, Hash < Hash2 ->
+ T;
+
+choose(Hash, [Hash1 | _], [Hash2 | _] = T)
+ when Hash2 =< Hash, Hash < Hash1 ->
+ T;
+
+choose(_, T1, T2) ->
+ max(T1, T2).
+
+%% ===========================================================================
+
+%% attach/1
+%%
+%% Register the local node as a handler of incoming requests for the
+%% specified services when using the route_session/2 spawn_opt
+%% callback.
+
+attach(ServiceNames) ->
+ abcast({attach, node(), ServiceNames}).
+
+%% detach/1
+%%
+%% Deregister the local node as a handler of incoming requests.
+
+detach(ServiceNames) ->
+ abcast({detach, node(), ServiceNames}).
+
+%% abcast/1
+
+abcast(T) ->
+ gen_server:abcast([node() | nodes()], ?SERVER, T),
+ ok.
+
+%% attached/1
+
+attached(ServiceName) ->
+ try
+ ets:lookup_element(?SERVICE_TABLE, ServiceName, 2)
+ catch
+ error: badarg -> []
+ end.
+
+%% cast/2
+
+cast(Node, T) ->
+ gen_server:cast({?SERVER, Node}, T).
+
+%% attach/2
+
+attach(Node, S) ->
+ case sets:to_list(S) of
+ [] ->
+ ok;
+ Services ->
+ cast(Node, {attach, node(), Services})
+ end.
+
+%% ===========================================================================
+
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, _Args = [], _Opts = []).
@@ -272,11 +452,12 @@ start_link() ->
%% binaries that aren't necessarily node names.
init([]) ->
- ets:new(?TABLE, [set, named_table]),
+ ets:new(?NODE_TABLE, [set, named_table]),
+ ets:new(?SERVICE_TABLE, [bag, named_table]),
ok = net_kernel:monitor_nodes(true, [{node_type, all}, nodedown_reason]),
- ets:insert(?TABLE, [{B,N} || N <- [node() | nodes()],
- B <- [?B(N)]]),
- {ok, erlang:monotonic_time()}.
+ ets:insert(?NODE_TABLE, [{?B(N), N} || N <- [node() | nodes()]]),
+ abcast({attach, node()}),
+ {ok, sets:new()}.
%% handle_call/3
@@ -285,17 +466,49 @@ handle_call(_, _From, S) ->
%% handle_cast/2
+%% Remote node is asking which services the local node wants to handle.
+handle_cast({attach, Node}, S)
+ when Node /= node() ->
+ attach(Node, S),
+ {noreply, S};
+
+%% Node wants to handle incoming requests ...
+handle_cast({attach, Node, ServiceNames}, S) ->
+ ets:insert(?SERVICE_TABLE, [{N, Node} || N <- ServiceNames]),
+ {noreply, case node() of
+ Node ->
+ sets:union(S, sets:from_list(ServiceNames));
+ _ ->
+ S
+ end};
+
+%% ... or not.
+handle_cast({detach, Node, ServiceNames}, S) ->
+ ets:select_delete(?SERVICE_TABLE, [{{'$1', Node},
+ [?ORCOND([{'==', '$1', {const, N}}
+ || N <- ServiceNames])],
+ [true]}]),
+ {noreply, case node() of
+ Node ->
+ sets:subtract(S, sets:from_list(ServiceNames));
+ _ ->
+ S
+ end};
+
handle_cast(_, S) ->
{noreply, S}.
%% handle_info/2
handle_info({nodeup, Node, _}, S) ->
- ets:insert(?TABLE, {?B(Node), Node}),
+ ets:insert(?NODE_TABLE, {?B(Node), Node}),
+ cast(Node, {attach, node()}), %% ask which services remote node handles
+ attach(Node, S), %% say which service local node handles
{noreply, S};
handle_info({nodedown, Node, _}, S) ->
- ets:delete(?TABLE, ?B(Node)),
+ ets:delete(?NODE_TABLE, ?B(Node)),
+ ets:select_delete(?SERVICE_TABLE, [{{'_', Node}, [], [true]}]),
{noreply, S};
handle_info(_, S) ->
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index b1b797aad8..c0643402a6 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -42,6 +42,9 @@
peer_up/1,
peer_down/1]).
+%% towards diameter_dist
+-export([request_info/1]).
+
%% internal
-export([send/1, %% send from remote node
request/1, %% process request in handler process
@@ -289,8 +292,7 @@ spawn_request(false, _, _, _, _, _, _) -> %% no transport
%% handler process dies (in a handle_request callback for example).
spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) ->
%% Term to pass to request/1 in an appropriate process. Module
- %% diameter_dist implements callbacks, and uses the form of the
- %% argument tuple constructed below.
+ %% diameter_dist implements callbacks.
ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData},
apply(M, F, [ReqT | A]);
@@ -302,6 +304,13 @@ spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData) ->
end,
Opts).
+%% request_info/1
+%%
+%% Limited request information for diameter_dist.
+
+request_info({Pkt, _AppT, _Ack, _TPid, _Dict0, RecvData} = _ReqT) ->
+ {RecvData#recvdata.service_name, Pkt#diameter_packet.bin}.
+
%% request/1
%%
%% Called from a handler process chosen by a transport spawn_opt MFA