summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaxim Fedorov <dane@whatsapp.com>2020-01-25 19:05:41 -0800
committerMaxim Fedorov <dane@whatsapp.com>2020-02-05 21:41:10 -0800
commit9279f3ce3e42444191208b8e6f75377b2895adf1 (patch)
treedd9c935462cc872ee1ea55072cef9d1b9af15965
parentc15eb5fdf721afed280afdbe0fff37706eef979c (diff)
downloaderlang-9279f3ce3e42444191208b8e6f75377b2895adf1.tar.gz
pg: distributed named process groups
Replacement for pg2 module. Differences (compared to pg2): * non-existent and empty group treated the same (empty list of pids), thus create/1 and delete/1 have no effect (and not implemented). which_groups() return only non-empty groups * no cluster lock required, and no dependency on global * all join/leave operations require local process (it's not possible to join a process from a different node) * multi-join: join/leave several processes with a single call Empty groups are not supported: unlike a process, group does not have originating node. So it's possible that during net split one node deletes the group, that still exists for another partition. pg2 will re-create deleted group as soon as net split converges, which is quite unexpected. Process groups can be organised into multiple scopes. Scopes are completely independent of each other. A process may join any number of groups in any number of scopes. Scopes are designed to decouple single mesh into a set of overlay networks, reducing amount of traffic required to propagate group membership information.
-rw-r--r--lib/kernel/doc/src/Makefile1
-rw-r--r--lib/kernel/doc/src/kernel_app.xml10
-rw-r--r--lib/kernel/doc/src/pg.xml189
-rw-r--r--lib/kernel/doc/src/ref_man.xml1
-rw-r--r--lib/kernel/doc/src/specs.xml1
-rw-r--r--lib/kernel/src/Makefile1
-rw-r--r--lib/kernel/src/kernel.app.src2
-rw-r--r--lib/kernel/src/kernel.erl19
-rw-r--r--lib/kernel/src/pg.erl507
-rw-r--r--lib/kernel/test/Makefile1
-rw-r--r--lib/kernel/test/pg_SUITE.erl619
-rw-r--r--lib/runtime_tools/src/observer_backend.erl3
-rw-r--r--system/doc/design_principles/distributed_applications.xml2
-rw-r--r--system/doc/tutorial/distribution.xml2
-rw-r--r--system/doc/tutorial/overview.xml2
15 files changed, 1353 insertions, 7 deletions
diff --git a/lib/kernel/doc/src/Makefile b/lib/kernel/doc/src/Makefile
index fe3dc9dab5..7c0f5b79be 100644
--- a/lib/kernel/doc/src/Makefile
+++ b/lib/kernel/doc/src/Makefile
@@ -67,6 +67,7 @@ XML_REF3_FILES = application.xml \
net_adm.xml \
net_kernel.xml \
os.xml \
+ pg.xml \
pg2.xml \
rpc.xml \
seq_trace.xml \
diff --git a/lib/kernel/doc/src/kernel_app.xml b/lib/kernel/doc/src/kernel_app.xml
index 7f9609d5c1..7ad3d15cd6 100644
--- a/lib/kernel/doc/src/kernel_app.xml
+++ b/lib/kernel/doc/src/kernel_app.xml
@@ -414,6 +414,15 @@ MaxT = TickTime + TickTime / 4</code>
using this service.</p>
<p>Defaults to <c>false</c>.</p>
</item>
+ <tag><c>start_pg = true | false</c></tag>
+ <item>
+ <marker id="start_pg"></marker>
+ <p>Starts the default <c>pg</c> scope server (see
+ <seealso marker="pg"><c>pg(3)</c></seealso>) if
+ the parameter is <c>true</c>. This parameter is to be set to
+ <c>true</c> in an embedded system that uses this service.</p>
+ <p>Defaults to <c>false</c>.</p>
+ </item>
<tag><c>start_pg2 = true | false</c></tag>
<item>
<marker id="start_pg2"></marker>
@@ -556,6 +565,7 @@ erl -kernel logger '[{handler,default,logger_std_h,#{formatter=>{logger_formatte
<seealso marker="logger"><c>logger(3)</c></seealso>,
<seealso marker="net_kernel"><c>net_kernel(3)</c></seealso>,
<seealso marker="os"><c>os(3)</c></seealso>,
+ <seealso marker="pg"><c>pg(3)</c></seealso>,
<seealso marker="pg2"><c>pg2(3)</c></seealso>,
<seealso marker="rpc"><c>rpc(3)</c></seealso>,
<seealso marker="seq_trace"><c>seq_trace(3)</c></seealso>,
diff --git a/lib/kernel/doc/src/pg.xml b/lib/kernel/doc/src/pg.xml
new file mode 100644
index 0000000000..359fbcf72a
--- /dev/null
+++ b/lib/kernel/doc/src/pg.xml
@@ -0,0 +1,189 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!DOCTYPE erlref SYSTEM "erlref.dtd">
+
+<!-- %ExternalCopyright% -->
+
+<erlref>
+ <header>
+ <copyright>
+ <year>2020</year><year>2020</year>
+ <holder>Maxim Fedorov, WhatsApp Inc.</holder>
+ </copyright>
+ <legalnotice>
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ </legalnotice>
+
+ <title>pg</title>
+ <prepared>maximfca@gmail.com</prepared>
+ <responsible></responsible>
+ <docno></docno>
+ <approved></approved>
+ <checked></checked>
+ <date></date>
+ <rev>A</rev>
+ <file>pg.xml</file>
+ </header>
+ <module since="OTP 23.0">pg</module>
+ <modulesummary>Distributed named process groups.</modulesummary>
+ <description>
+ <p>This module implements process groups. A message can be sent
+ to one, some, or all group members.</p>
+ <p>A group of processes can be accessed by a common name. For
+ example, if there is a group named <c>foobar</c>, there can be a
+ set of processes (which can be located on different nodes) that
+ are all members of the group <c>foobar</c>. There are no special
+ functions for sending a message to the group. Instead, client
+ functions are to be written with the functions
+ <seealso marker="#get_members/1"><c>get_members/1</c></seealso> and
+ <seealso marker="#get_local_members/1"><c>get_local_members/1</c></seealso>
+ to determine which processes are members of the group.
+ Then the message can be sent to one or more group members.</p>
+ <p>If a member terminates, it is automatically removed from the group.</p>
+
+ <p>A process may join multiple groups. It may join the same group multiple times.
+ It is only allowed to join processes running on local node.
+ </p>
+
+ <p>Process Groups implement strong eventual consistency.
+ Unlike <seealso marker="kernel_app"><c>pg2</c></seealso>, that provides
+ strong ordering guarantees, Process Groups membership view may temporarily
+ diverge. For example, when processes on <c>node1</c> and <c>node2</c>
+ join concurrently, <c>node3</c> and <c>node4</c> may receive updates in
+ a different order.</p>
+
+ <p> Membership view is not transitive. If <c>node1</c> is not directly
+ connected to <c>node2</c>, they will not see each other groups. But if
+ both are connected to <c>node3</c>, <c>node3</c> will have the full view. </p>
+
+ <p>Groups are automatically created when any process joins,
+ and are removed when all processes leave the group. Non-existing group is
+ considered empty (containing no processes).</p>
+
+ <p>Process groups can be organised into multiple scopes. Scopes are
+ completely independent of each other. A process may join any
+ number of groups in any number of scopes. Scopes are designed to
+ decouple single mesh into a set of overlay networks, reducing
+ amount of traffic required to propagate group membership
+ information. Default scope <c>pg</c> is started automatically
+ when <seealso marker="kernel_app#start_pg"><c>kernel(6)</c></seealso>
+ is configured to do so.
+ </p>
+
+ <note><p>
+ Scope name is used to register process locally, and to name an ETS table.
+ If there is another process registered under this name, or another ETS table
+ exists, scope fails to start.</p>
+ <p>Local membership is not preserved if scope process exits and
+ restarts. This behaviour is different from
+ <seealso marker="kernel_app"><c>pg2</c></seealso>, that recovers
+ local membership from remote nodes.
+ </p></note>
+
+ </description>
+
+ <datatypes>
+ <datatype>
+ <name name="group"/>
+ <desc><p>The identifier of a process group.</p></desc>
+ </datatype>
+ </datatypes>
+
+ <funcs>
+
+ <func>
+ <name name="start_link" arity="0" since="OTP 23.0"/>
+ <fsummary>Start the default <c>pg</c> scope.</fsummary>
+ <desc>
+ <p>Starts the default <c>pg</c> scope within supervision tree.
+ Kernel may be configured to do it automatically, see
+ <seealso marker="kernel_app#start_pg"><c>kernel(6)</c></seealso>
+ configuration manual.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="start" arity="1" since="OTP 23.0"/>
+ <name name="start_link" arity="1" since="OTP 23.0"/>
+ <fsummary>Start additional scope.</fsummary>
+ <desc>
+ <p>Starts additional scope.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="join" arity="2" since="OTP 23.0"/>
+ <name name="join" arity="3" since="OTP 23.0"/>
+ <fsummary>Join a process or a list of processes to a group.</fsummary>
+ <desc>
+ <p>Joins single process or multiple processes to the
+ group <c>Name</c>. A process can join a group many times and
+ must then leave the group the same number of times.</p>
+ <p><c>PidOrPids</c> may contain the same process multiple times.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="leave" arity="2" since="OTP 23.0"/>
+ <name name="leave" arity="3" since="OTP 23.0"/>
+ <fsummary>Make a process leave a group.</fsummary>
+ <desc>
+ <p>Makes the process <c>PidOrPids</c> leave the group <c>Name</c>.
+ If the process is not a member of the group, <c>not_joined</c> is
+ returned.</p>
+ <p>When list of processes is passed as <c>PidOrPids</c>, function
+ returns <c>not_joined</c> only when all processes of the list
+ are not joined.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="get_local_members" arity="1" since="OTP 23.0"/>
+ <name name="get_local_members" arity="2" since="OTP 23.0"/>
+ <fsummary>Return all local processes in a group.</fsummary>
+ <desc>
+ <p>Returns all processes running on the local node in the
+ group <c>Name</c>. Processes are returned in no specific order.
+ This function is optimised for speed.
+ </p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="get_members" arity="1" since="OTP 23.0"/>
+ <name name="get_members" arity="2" since="OTP 23.0"/>
+ <fsummary>Return all processes in a group.</fsummary>
+ <desc>
+ <p>Returns all processes in the group <c>Name</c>.
+ Processes are returned in no specific order.
+ This function is optimised for speed.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="which_groups" arity="0" since="OTP 23.0"/>
+ <name name="which_groups" arity="1" since="OTP 23.0"/>
+ <fsummary>Return a list of all known groups.</fsummary>
+ <desc>
+ <p>Returns a list of all known groups.</p>
+ </desc>
+ </func>
+
+ </funcs>
+
+ <section>
+ <title>See Also</title>
+ <p><seealso marker="kernel_app"><c>kernel(6)</c></seealso></p>
+ </section>
+</erlref>
+
diff --git a/lib/kernel/doc/src/ref_man.xml b/lib/kernel/doc/src/ref_man.xml
index 9df51dee22..2e3a749949 100644
--- a/lib/kernel/doc/src/ref_man.xml
+++ b/lib/kernel/doc/src/ref_man.xml
@@ -64,6 +64,7 @@
<xi:include href="net_adm.xml"/>
<xi:include href="net_kernel.xml"/>
<xi:include href="os.xml"/>
+ <xi:include href="pg.xml"/>
<xi:include href="pg2.xml"/>
<xi:include href="rpc.xml"/>
<xi:include href="seq_trace.xml"/>
diff --git a/lib/kernel/doc/src/specs.xml b/lib/kernel/doc/src/specs.xml
index 9e258910db..27bba6e78f 100644
--- a/lib/kernel/doc/src/specs.xml
+++ b/lib/kernel/doc/src/specs.xml
@@ -30,6 +30,7 @@
<xi:include href="../specs/specs_net_adm.xml"/>
<xi:include href="../specs/specs_net_kernel.xml"/>
<xi:include href="../specs/specs_os.xml"/>
+ <xi:include href="../specs/specs_pg.xml"/>
<xi:include href="../specs/specs_pg2.xml"/>
<xi:include href="../specs/specs_rpc.xml"/>
<xi:include href="../specs/specs_seq_trace.xml"/>
diff --git a/lib/kernel/src/Makefile b/lib/kernel/src/Makefile
index 2d2b84c206..3b93d84e02 100644
--- a/lib/kernel/src/Makefile
+++ b/lib/kernel/src/Makefile
@@ -128,6 +128,7 @@ MODULES = \
net_adm \
net_kernel \
os \
+ pg \
pg2 \
ram_file \
rpc \
diff --git a/lib/kernel/src/kernel.app.src b/lib/kernel/src/kernel.app.src
index 8456d2a640..08ad4ed7da 100644
--- a/lib/kernel/src/kernel.app.src
+++ b/lib/kernel/src/kernel.app.src
@@ -103,6 +103,7 @@
inet_tcp,
inet_udp,
inet_sctp,
+ pg,
pg2,
raw_file_io,
raw_file_io_compressed,
@@ -143,6 +144,7 @@
ddll_server,
erl_epmd,
inet_db,
+ pg,
pg2]},
{applications, []},
{env, [{logger_level, notice},
diff --git a/lib/kernel/src/kernel.erl b/lib/kernel/src/kernel.erl
index 8877ceea8e..83f3fbecd5 100644
--- a/lib/kernel/src/kernel.erl
+++ b/lib/kernel/src/kernel.erl
@@ -64,7 +64,7 @@ config_change(Changed, New, Removed) ->
%%% (file,code, | erl_dist (A)| | safe_sup (1)|
%%% rpc, ...) ------------- -------------
%%% | |
-%%% (net_kernel, (disk_log, pg2,
+%%% (net_kernel, (disk_log, pg,
%%% auth, ...) ...)
%%%
%%% The rectangular boxes are supervisors. All supervisors except
@@ -180,7 +180,7 @@ init(safe) ->
Boot = start_boot_server(),
DiskLog = start_disk_log(),
- Pg2 = start_pg2(),
+ Pg = start_pg2() ++ start_pg(),
%% Run the on_load handlers for all modules that have been
%% loaded so far. Running them at this point means that
@@ -188,7 +188,7 @@ init(safe) ->
%% (and in particular call code:priv_dir/1 or code:lib_dir/1).
init:run_on_load_handlers(),
- {ok, {SupFlags, Boot ++ DiskLog ++ Pg2}}.
+ {ok, {SupFlags, Boot ++ DiskLog ++ Pg}}.
start_distribution() ->
Rpc = #{id => rex,
@@ -279,6 +279,19 @@ start_disk_log() ->
[]
end.
+start_pg() ->
+ case application:get_env(kernel, start_pg) of
+ {ok, true} ->
+ [#{id => pg,
+ start => {pg, start_link, []},
+ restart => permanent,
+ shutdown => 1000,
+ type => worker,
+ modules => [pg]}];
+ _ ->
+ []
+ end.
+
start_pg2() ->
case application:get_env(kernel, start_pg2) of
{ok, true} ->
diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
new file mode 100644
index 0000000000..0668dd1f79
--- /dev/null
+++ b/lib/kernel/src/pg.erl
@@ -0,0 +1,507 @@
+%%
+%%
+%% Copyright WhatsApp Inc. and its affiliates. All rights reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%%
+%%-------------------------------------------------------------------
+%%
+%% @author Maxim Fedorov <maximfca@gmail.com>
+%% Process Groups with eventually consistent membership.
+%%
+%% Differences (compared to pg2):
+%% * non-existent and empty group treated the same (empty list of pids),
+%% thus create/1 and delete/1 have no effect (and not implemented).
+%% which_groups() return only non-empty groups
+%% * no cluster lock required, and no dependency on global
+%% * all join/leave operations require local process (it's not possible to join
+%% a process from a different node)
+%% * multi-join: join/leave several processes with a single call
+%%
+%% Why empty groups are not supported:
+%% Unlike a process, group does not have originating node. So it's possible
+%% that during net split one node deletes the group, that still exists for
+%% another partition. pg2 will recover the group, as soon as net
+%% split converges, which is quite unexpected.
+%%
+%% Exchange protocol:
+%% * when pg process starts, it broadcasts
+%% 'discover' message to all nodes in the cluster
+%% * when pg server receives 'discover', it responds with 'sync' message
+%% containing list of groups with all local processes, and starts to
+%% monitor process that sent 'discover' message (assuming it is a part
+%% of an overlay network)
+%% * every pg process monitors 'nodeup' messages to attempt discovery for
+%% nodes that are (re)joining the cluster
+%%
+%% Leave/join operations:
+%% * processes joining the group are monitored on the local node
+%% * when process exits (without leaving groups prior to exit), local
+%% instance of pg scoped process detects this and sends 'leave' to
+%% all nodes in an overlay network (no remote monitoring done)
+%% * all leave/join operations are serialised through pg server process
+%%
+-module(pg).
+
+%% API: default scope
+-export([
+ start_link/0,
+
+ join/2,
+ leave/2,
+ get_members/1,
+ get_local_members/1,
+ which_groups/0,
+ which_local_groups/0
+]).
+
+%% Scoped API: overlay networks
+-export([
+ start/1,
+ start_link/1,
+
+ join/3,
+ leave/3,
+ get_members/2,
+ get_local_members/2,
+ which_groups/1,
+ which_local_groups/1
+]).
+
+%% gen_server callbacks
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2
+]).
+
+%% Types
+-type group() :: any().
+
+%% Default scope started by kernel app
+-define(DEFAULT_SCOPE, ?MODULE).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server and links it to calling process.
+%% Uses default scope, which is the same as as the module name.
+-spec start_link() -> {ok, pid()} | {error, any()}.
+start_link() ->
+ start_link(?DEFAULT_SCOPE).
+
+%% @doc
+%% Starts the server outside of supervision hierarchy.
+-spec start(Scope :: atom()) -> {ok, pid()} | {error, any()}.
+start(Scope) when is_atom(Scope) ->
+ gen_server:start({local, Scope}, ?MODULE, [Scope], []).
+
+%% @doc
+%% Starts the server and links it to calling process.
+%% Scope name is supplied.
+-spec start_link(Scope :: atom()) -> {ok, pid()} | {error, any()}.
+start_link(Scope) when is_atom(Scope) ->
+ gen_server:start_link({local, Scope}, ?MODULE, [Scope], []).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Joins a single process
+%% Group is created automatically.
+%% Process must be local to this node.
+-spec join(Group :: group(), PidOrPids :: pid() | [pid()]) -> ok.
+join(Group, PidOrPids) ->
+ join(?DEFAULT_SCOPE, Group, PidOrPids).
+
+-spec join(Scope :: atom(), Group :: group(), PidOrPids :: pid() | [pid()]) -> ok.
+join(Scope, Group, PidOrPids) ->
+ Node = node(),
+ is_list(PidOrPids) andalso [error({nolocal, Pid}) || Pid <- PidOrPids, node(Pid) =/= Node orelse not is_pid(Pid)],
+ gen_server:call(Scope, {join_local, Group, PidOrPids}, infinity).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Single process leaving the group.
+%% Process must be local to this node.
+-spec leave(Group :: group(), PidOrPids :: pid() | [pid()]) -> ok.
+leave(Group, PidOrPids) ->
+ leave(?DEFAULT_SCOPE, Group, PidOrPids).
+
+-spec leave(Scope :: atom(), Group :: group(), Pid :: pid() | [pid()]) -> ok | not_joined.
+leave(Scope, Group, PidOrPids) ->
+ Node = node(),
+ is_list(PidOrPids) andalso [error({nolocal, Pid}) || Pid <- PidOrPids, node(Pid) =/= Node orelse not is_pid(Pid)],
+ gen_server:call(Scope, {leave_local, Group, PidOrPids}, infinity).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Returns all processes in a group
+-spec get_members(Group :: group()) -> [pid()].
+get_members(Group) ->
+ get_members(?DEFAULT_SCOPE, Group).
+
+-spec get_members(Scope :: atom(), Group :: group()) -> [pid()].
+get_members(Scope, Group) ->
+ try
+ ets:lookup_element(Scope, Group, 2)
+ catch
+ error:badarg ->
+ []
+ end.
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Returns processes in a group, running on local node.
+-spec get_local_members(Group :: group()) -> [pid()].
+get_local_members(Group) ->
+ get_local_members(?DEFAULT_SCOPE, Group).
+
+-spec get_local_members(Scope :: atom(), Group :: group()) -> [pid()].
+get_local_members(Scope, Group) ->
+ try
+ ets:lookup_element(Scope, Group, 3)
+ catch
+ error:badarg ->
+ []
+ end.
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Returns a list of all known groups.
+-spec which_groups() -> [Group :: group()].
+which_groups() ->
+ which_groups(?DEFAULT_SCOPE).
+
+-spec which_groups(Scope :: atom()) -> [Group :: group()].
+which_groups(Scope) when is_atom(Scope) ->
+ [G || [G] <- ets:match(Scope, {'$1', '_', '_'})].
+
+%%--------------------------------------------------------------------
+%% @private
+%% Returns a list of groups that have any local processes joined.
+-spec which_local_groups() -> [Group :: group()].
+which_local_groups() ->
+ which_local_groups(?DEFAULT_SCOPE).
+
+-spec which_local_groups(Scope :: atom()) -> [Group :: group()].
+which_local_groups(Scope) when is_atom(Scope) ->
+ ets:select(Scope, [{{'$1', '_', '$2'}, [{'=/=', '$2', []}], ['$1']}]).
+
+%%--------------------------------------------------------------------
+%% Internal implementation
+
+%% gen_server implementation
+-record(state, {
+ %% ETS table name, and also the registered process name (self())
+ scope :: atom(),
+ %% monitored local processes and groups they joined
+ monitors = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}},
+ %% remote node: scope process monitor and map of groups to pids for fast sync routine
+ nodes = #{} :: #{pid() => {reference(), #{group() => [pid()]}}}
+}).
+
+-type state() :: #state{}.
+
+-spec init([Scope :: atom()]) -> {ok, state()}.
+init([Scope]) ->
+ ok = net_kernel:monitor_nodes(true),
+ %% discover all nodes in the cluster
+ broadcast([{Scope, Node} || Node <- nodes()], {discover, self()}),
+ Scope = ets:new(Scope, [set, protected, named_table, {read_concurrency, true}]),
+ {ok, #state{scope = Scope}}.
+
+-spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()}
+ | {leave_local, Group :: group(), Pid :: pid()},
+ From :: {pid(),Tag :: any()},
+ State :: state()) -> {reply, ok | not_joined, state()}.
+
+handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) ->
+ NewMons = join_monitors(PidOrPids, Group, Monitors),
+ join_local_group(Scope, Group, PidOrPids),
+ broadcast(maps:keys(Nodes), {join, self(), Group, PidOrPids}),
+ {reply, ok, State#state{monitors = NewMons}};
+
+handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) ->
+ case leave_monitors(PidOrPids, Group, Monitors) of
+ Monitors ->
+ {reply, not_joined, State};
+ NewMons ->
+ leave_local_group(Scope, Group, PidOrPids),
+ broadcast(maps:keys(Nodes), {leave, self(), PidOrPids, [Group]}),
+ {reply, ok, State#state{monitors = NewMons}}
+ end;
+
+handle_call(_Request, _From, _S) ->
+ error(badarg).
+
+-spec handle_cast(
+ {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]},
+ State :: state()) -> {noreply, state()}.
+
+handle_cast({sync, Peer, Groups}, #state{scope = Scope, nodes = Nodes} = State) ->
+ {noreply, State#state{nodes = handle_sync(Scope, Peer, Nodes, Groups)}};
+
+handle_cast(_, _State) ->
+ error(badarg).
+
+-spec handle_info(
+ {discover, Peer :: pid()} |
+ {join, Peer :: pid(), group(), pid() | [pid()]} |
+ {leave, Peer :: pid(), pid() | [pid()], [group()]} |
+ {'DOWN', reference(), process, pid(), term()} |
+ {nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}.
+
+%% remote pid or several pids joining the group
+handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, nodes = Nodes} = State) ->
+ join_remote(Scope, Group, PidOrPids),
+ % store remote group => pids map for fast sync operation
+ {MRef, RemoteGroups} = maps:get(Peer, Nodes),
+ NewRemoteGroups = join_remote_map(Group, PidOrPids, RemoteGroups),
+ {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteGroups}}}};
+
+%% remote pid leaving (multiple groups at once)
+handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, nodes = Nodes} = State) ->
+ _ = leave_remote(Scope, PidOrPids, Groups),
+ {MRef, RemoteMap} = maps:get(Peer, Nodes),
+ NewRemoteMap = lists:foldl(
+ fun (Group, Acc) ->
+ case maps:get(Group, Acc) of
+ PidOrPids ->
+ Acc;
+ [PidOrPids] ->
+ Acc;
+ Existing when is_pid(PidOrPids) ->
+ Acc#{Group => lists:delete(PidOrPids, Existing)};
+ Existing ->
+ Acc#{Group => Existing-- PidOrPids}
+ end
+ end, RemoteMap, Groups),
+ {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteMap}}}};
+
+%% we're being discovered, let's exchange!
+handle_info({discover, Peer}, #state{scope = Scope, nodes = Nodes} = State) ->
+ gen_server:cast(Peer, {sync, self(), all_local_pids(Scope)}),
+ %% do we know who is looking for us?
+ case maps:is_key(Peer, Nodes) of
+ true ->
+ {noreply, State};
+ false ->
+ MRef = monitor(process, Peer),
+ erlang:send(Peer, {discover, self()}, [noconnect]),
+ {noreply, State#state{nodes = Nodes#{Peer => {MRef, #{}}}}}
+ end;
+
+%% handle local process exit
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) when node(Pid) =:= node() ->
+ case maps:take(Pid, Monitors) of
+ error ->
+ %% this can only happen when leave request and 'DOWN' are in pg queue
+ {noreply, State};
+ {{MRef, Groups}, NewMons} ->
+ [leave_local_group(Scope, Group, Pid) || Group <- Groups],
+ %% send update to all nodes
+ broadcast(maps:keys(Nodes), {leave, self(), Pid, Groups}),
+ {noreply, State#state{monitors = NewMons}}
+ end;
+
+%% handle remote node down or leaving overlay network
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, nodes = Nodes} = State) ->
+ {{MRef, RemoteMap}, NewNodes} = maps:take(Pid, Nodes),
+ _ = maps:map(fun (Group, Pids) -> leave_remote(Scope, Pids, [Group]) end, RemoteMap),
+ {noreply, State#state{nodes = NewNodes}};
+
+%% nodedown: ignore, and wait for 'DOWN' signal for monitored process
+handle_info({nodedown, _Node}, State) ->
+ {noreply, State};
+
+%% nodeup: discover if remote node participates in the overlay network
+handle_info({nodeup, Node}, #state{scope = Scope} = State) ->
+ {Scope, Node} ! {discover, self()},
+ {noreply, State};
+
+handle_info(_Info, _State) ->
+ error(badarg).
+
+-spec terminate(Reason :: any(), State :: state()) -> true.
+terminate(_Reason, #state{scope = Scope}) ->
+ true = ets:delete(Scope).
+
+%%--------------------------------------------------------------------
+%% Internal implementation
+
+%% Override all knowledge of the remote node with information it sends
+%% to local node. Current implementation must do the full table scan
+%% to remove stale pids (just as for 'nodedown').
+handle_sync(Scope, Peer, Nodes, Groups) ->
+ %% can't use maps:get() because it evaluates 'default' value first,
+ %% and in this case monitor() call has side effect.
+ {MRef, RemoteGroups} =
+ case maps:find(Peer, Nodes) of
+ error ->
+ {monitor(process, Peer), #{}};
+ {ok, MRef0} ->
+ MRef0
+ end,
+ %% sync RemoteMap and transform ETS table
+ _ = sync_groups(Scope, RemoteGroups, Groups),
+ Nodes#{Peer => {MRef, maps:from_list(Groups)}}.
+
+sync_groups(Scope, RemoteGroups, []) ->
+ %% leave all missing groups
+ [leave_remote(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
+sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) ->
+ case maps:take(Group, RemoteGroups) of
+ {Pids, NewRemoteGroups} ->
+ sync_groups(Scope, NewRemoteGroups, Tail);
+ {OldPids, NewRemoteGroups} ->
+ [{Group, AllOldPids, LocalPids}] = ets:lookup(Scope, Group),
+ %% should be really rare...
+ AllNewPids = Pids ++ AllOldPids -- OldPids,
+ true = ets:insert(Scope, {Group, AllNewPids, LocalPids}),
+ sync_groups(Scope, NewRemoteGroups, Tail);
+ error ->
+ join_remote(Scope, Group, Pids),
+ sync_groups(Scope, RemoteGroups, Tail)
+ end.
+
+join_monitors(Pid, Group, Monitors) when is_pid(Pid) ->
+ case maps:find(Pid, Monitors) of
+ {ok, {MRef, Groups}} ->
+ maps:put(Pid, {MRef, [Group | Groups]}, Monitors);
+ error ->
+ MRef = erlang:monitor(process, Pid),
+ Monitors#{Pid => {MRef, [Group]}}
+ end;
+join_monitors([], _Group, Monitors) ->
+ Monitors;
+join_monitors([Pid | Tail], Group, Monitors) ->
+ join_monitors(Tail, Group, join_monitors(Pid, Group, Monitors)).
+
+join_local_group(Scope, Group, Pid) when is_pid(Pid) ->
+ case ets:lookup(Scope, Group) of
+ [{Group, All, Local}] ->
+ ets:insert(Scope, {Group, [Pid | All], [Pid | Local]});
+ [] ->
+ ets:insert(Scope, {Group, [Pid], [Pid]})
+ end;
+join_local_group(Scope, Group, Pids) ->
+ case ets:lookup(Scope, Group) of
+ [{Group, All, Local}] ->
+ ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local});
+ [] ->
+ ets:insert(Scope, {Group, Pids, Pids})
+ end.
+
+join_remote(Scope, Group, Pid) when is_pid(Pid) ->
+ case ets:lookup(Scope, Group) of
+ [{Group, All, Local}] ->
+ ets:insert(Scope, {Group, [Pid | All], Local});
+ [] ->
+ ets:insert(Scope, {Group, [Pid], []})
+ end;
+join_remote(Scope, Group, Pids) ->
+ case ets:lookup(Scope, Group) of
+ [{Group, All, Local}] ->
+ ets:insert(Scope, {Group, Pids ++ All, Local});
+ [] ->
+ ets:insert(Scope, {Group, Pids, []})
+ end.
+
+join_remote_map(Group, Pid, RemoteGroups) when is_pid(Pid) ->
+ maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups);
+join_remote_map(Group, Pids, RemoteGroups) ->
+ maps:update_with(Group, fun (List) -> Pids ++ List end, Pids, RemoteGroups).
+
+leave_monitors(Pid, Group, Monitors) when is_pid(Pid) ->
+ case maps:find(Pid, Monitors) of
+ {ok, {MRef, [Group]}} ->
+ erlang:demonitor(MRef),
+ maps:remove(Pid, Monitors);
+ {ok, {MRef, Groups}} ->
+ case lists:member(Group, Groups) of
+ true ->
+ maps:put(Pid, {MRef, lists:delete(Group, Groups)}, Monitors);
+ false ->
+ Monitors
+ end;
+ _ ->
+ Monitors
+ end;
+leave_monitors([], _Group, Monitors) ->
+ Monitors;
+leave_monitors([Pid | Tail], Group, Monitors) ->
+ leave_monitors(Tail, Group, leave_monitors(Pid, Group, Monitors)).
+
+leave_local_group(Scope, Group, Pid) when is_pid(Pid) ->
+ case ets:lookup(Scope, Group) of
+ [{Group, [Pid], [Pid]}] ->
+ ets:delete(Scope, Group);
+ [{Group, All, Local}] ->
+ ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)});
+ [] ->
+ %% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing.
+ true
+ end;
+leave_local_group(Scope, Group, Pids) ->
+ case ets:lookup(Scope, Group) of
+ [{Group, All, Local}] ->
+ case All -- Pids of
+ [] ->
+ ets:delete(Scope, Group);
+ NewAll ->
+ ets:insert(Scope, {Group, NewAll, Local -- Pids})
+ end;
+ [] ->
+ true
+ end.
+
+leave_remote(Scope, Pid, Groups) when is_pid(Pid) ->
+ _ = [
+ case ets:lookup(Scope, Group) of
+ [{Group, [Pid], []}] ->
+ ets:delete(Scope, Group);
+ [{Group, All, Local}] ->
+ ets:insert(Scope, {Group, lists:delete(Pid, All), Local});
+ [] ->
+ true
+ end ||
+ Group <- Groups];
+leave_remote(Scope, Pids, Groups) ->
+ _ = [
+ case ets:lookup(Scope, Group) of
+ [{Group, All, Local}] ->
+ case All -- Pids of
+ [] when Local =:= [] ->
+ ets:delete(Scope, Group);
+ NewAll ->
+ ets:insert(Scope, {Group, NewAll, Local})
+ end;
+ [] ->
+ true
+ end ||
+ Group <- Groups].
+
+all_local_pids(Scope) ->
+ %% selector: ets:fun2ms(fun({N,_,L}) when L =/=[] -> {N,L}end).
+ ets:select(Scope, [{{'$1','_','$2'},[{'=/=','$2',[]}],[{{'$1','$2'}}]}]).
+
+%% Works as gen_server:abcast(), but accepts a list of processes
+%% instead of nodes list.
+broadcast([], _Msg) ->
+ ok;
+broadcast([Dest | Tail], Msg) ->
+ %% do not use 'nosuspend', as it will lead to missing
+ %% join/leave messages when dist buffer is full
+ erlang:send(Dest, Msg, [noconnect]),
+ broadcast(Tail, Msg).
diff --git a/lib/kernel/test/Makefile b/lib/kernel/test/Makefile
index bd1590ee8f..10c55907e9 100644
--- a/lib/kernel/test/Makefile
+++ b/lib/kernel/test/Makefile
@@ -85,6 +85,7 @@ MODULES= \
logger_test_lib \
net_SUITE \
os_SUITE \
+ pg_SUITE \
pg2_SUITE \
seq_trace_SUITE \
wrap_log_reader_SUITE \
diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl
new file mode 100644
index 0000000000..bdb7abe99d
--- /dev/null
+++ b/lib/kernel/test/pg_SUITE.erl
@@ -0,0 +1,619 @@
+%%
+%%
+%% Copyright WhatsApp Inc. and its affiliates. All rights reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%%-------------------------------------------------------------------
+%% @author Maxim Fedorov <maximfca@gmail.com>
+%% Process Groups smoke test.
+-module(pg_SUITE).
+-author("maximfca@gmail.com").
+
+%% Test server callbacks
+-export([
+ suite/0,
+ all/0,
+ groups/0,
+ init_per_suite/1,
+ end_per_suite/1,
+ init_per_testcase/2,
+ end_per_testcase/2,
+ stop_proc/1
+]).
+
+%% Test cases exports
+-export([
+ pg/0, pg/1,
+ errors/0, errors/1,
+ leave_exit_race/0, leave_exit_race/1,
+ single/0, single/1,
+ two/1,
+ thundering_herd/0, thundering_herd/1,
+ initial/1,
+ netsplit/1,
+ trisplit/1,
+ foursplit/1,
+ exchange/1,
+ nolocal/1,
+ double/1,
+ scope_restart/1,
+ missing_scope_join/1,
+ disconnected_start/1,
+ forced_sync/0, forced_sync/1,
+ group_leave/1
+]).
+
+-export([
+ control/1,
+ controller/3
+]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+suite() ->
+ [{timetrap, {seconds, 10}}].
+
+init_per_suite(Config) ->
+ case erlang:is_alive() of
+ false ->
+ %% verify epmd running (otherwise next call fails)
+ (erl_epmd:names("localhost") =:= {error, address}) andalso ([] = os:cmd("epmd -daemon")),
+ %% start a random node name
+ NodeName = list_to_atom(lists:concat([atom_to_list(?MODULE), "_", os:getpid()])),
+ {ok, Pid} = net_kernel:start([NodeName, shortnames]),
+ [{distribution, Pid} | Config];
+ true ->
+ Config
+ end.
+
+end_per_suite(Config) ->
+ is_pid(proplists:get_value(distribution, Config)) andalso net_kernel:stop().
+
+init_per_testcase(TestCase, Config) ->
+ {ok, _Pid} = pg:start_link(TestCase),
+ Config.
+
+end_per_testcase(TestCase, _Config) ->
+ gen_server:stop(TestCase),
+ ok.
+
+all() ->
+ [{group, basic}, {group, cluster}, {group, performance}].
+
+groups() ->
+ [
+ {basic, [parallel], [errors, pg, leave_exit_race, single]},
+ {performance, [sequential], [thundering_herd]},
+ {cluster, [parallel], [two, initial, netsplit, trisplit, foursplit,
+ exchange, nolocal, double, scope_restart, missing_scope_join,
+ disconnected_start, forced_sync, group_leave]}
+ ].
+
+%%--------------------------------------------------------------------
+%% TEST CASES
+
+pg() ->
+ [{doc, "This test must be names pg, to stay inline with default scope"}].
+
+pg(_Config) ->
+ ?assertNotEqual(undefined, whereis(?FUNCTION_NAME)), %% ensure scope was started
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, self())),
+ ?assertEqual([self()], pg:get_local_members(?FUNCTION_NAME)),
+ ?assertEqual([?FUNCTION_NAME], pg:which_groups()),
+ ?assertEqual([?FUNCTION_NAME], pg:which_local_groups()),
+ ?assertEqual(ok, pg:leave(?FUNCTION_NAME, self())),
+ ?assertEqual([], pg:get_members(?FUNCTION_NAME)),
+ ?assertEqual([], pg:which_groups(?FUNCTION_NAME)),
+ ?assertEqual([], pg:which_local_groups(?FUNCTION_NAME)).
+
+errors() ->
+ [{doc, "Tests that errors are handled as expected, for example pg server crashes when it needs to"}].
+
+errors(_Config) ->
+ %% kill with 'info' and 'cast'
+ ?assertException(error, badarg, pg:handle_info(garbage, garbage)),
+ ?assertException(error, badarg, pg:handle_cast(garbage, garbage)),
+ %% kill with call
+ {ok, _Pid} = pg:start(second),
+ ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage, 100)).
+
+leave_exit_race() ->
+ [{doc, "Tests that pg correctly handles situation when leave and 'DOWN' messages are both in pg queue"}].
+
+leave_exit_race(Config) when is_list(Config) ->
+ process_flag(priority, high),
+ [
+ begin
+ Pid = spawn(fun () -> ok end),
+ pg:join(leave_exit_race, test, Pid),
+ pg:leave(leave_exit_race, test, Pid)
+ end
+ || _ <- lists:seq(1, 100)].
+
+single() ->
+ [{doc, "Tests single node groups"}, {timetrap, {seconds, 5}}].
+
+single(Config) when is_list(Config) ->
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [self(), self()])),
+ ?assertEqual([self(), self(), self()], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ ?assertEqual([self(), self(), self()], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, '$missing$', self())),
+ ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, [self(), self()])),
+ ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, self())),
+ ?assertEqual([], pg:which_groups(?FUNCTION_NAME)),
+ ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ %% double
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())),
+ Pid = erlang:spawn(forever()),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
+ Expected = lists:sort([Pid, self()]),
+ ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
+ ?assertEqual(Expected, lists:sort(pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
+
+ stop_proc(Pid),
+ sync(?FUNCTION_NAME),
+ ?assertEqual([self()], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, self())),
+ ok.
+
+two(Config) when is_list(Config) ->
+ {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ Pid = erlang:spawn(forever()),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
+ ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ %% first RPC must be serialised
+ sync({?FUNCTION_NAME, TwoPeer}),
+ ?assertEqual([Pid], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
+ ?assertEqual([], rpc:call(TwoPeer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
+ stop_proc(Pid),
+ %% again, must be serialised
+ sync(?FUNCTION_NAME),
+ ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ ?assertEqual([], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
+
+ Pid2 = erlang:spawn(TwoPeer, forever()),
+ Pid3 = erlang:spawn(TwoPeer, forever()),
+ ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid2])),
+ ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid3])),
+ %% serialise through the *other* node
+ sync({?FUNCTION_NAME, TwoPeer}),
+ ?assertEqual(lists:sort([Pid2, Pid3]),
+ lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
+ %% stop the peer
+ stop_node(TwoPeer, Socket),
+ %% hope that 'nodedown' comes before we route our request
+ sync(?FUNCTION_NAME),
+ ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ ok.
+
+thundering_herd() ->
+ [{doc, "Thousands of overlay network nodes sending sync to us, and we time out!"}, {timetrap, {seconds, 5}}].
+
+thundering_herd(Config) when is_list(Config) ->
+ GroupCount = 10000,
+ SyncCount = 2000,
+ %% make up a large amount of groups
+ [pg:join(?FUNCTION_NAME, {group, Seq}, self()) || Seq <- lists:seq(1, GroupCount)],
+ %% initiate a few syncs - and those are really slow...
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ PeerPid = erlang:spawn(Peer, forever()),
+ PeerPg = rpc:call(Peer, erlang, whereis, [?FUNCTION_NAME], 1000),
+ %% WARNING: code below acts for white-box! %% WARNING
+ FakeSync = [{{group, 1}, [PeerPid, PeerPid]}],
+ [gen_server:cast(?FUNCTION_NAME, {sync, PeerPg, FakeSync}) || _ <- lists:seq(1, SyncCount)],
+ %% next call must not timetrap, otherwise test fails
+ pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self()),
+ stop_node(Peer, Socket).
+
+initial(Config) when is_list(Config) ->
+ Pid = erlang:spawn(forever()),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
+ ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ %% first RPC must be serialised
+ sync({?FUNCTION_NAME, Peer}),
+ ?assertEqual([Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
+
+ ?assertEqual([], rpc:call(Peer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
+ stop_proc(Pid),
+ sync({?FUNCTION_NAME, Peer}),
+ ?assertEqual([], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
+ stop_node(Peer, Socket),
+ ok.
+
+netsplit(Config) when is_list(Config) ->
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ ?assertEqual(Peer, rpc(Socket, erlang, node, [])), %% just to test RPC
+ RemoteOldPid = erlang:spawn(Peer, forever()),
+ ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, '$invisible', RemoteOldPid])),
+ %% hohoho, partition!
+ net_kernel:disconnect(Peer),
+ ?assertEqual(Peer, rpc(Socket, erlang, node, [])), %% just to ensure RPC still works
+ RemotePid = rpc(Socket, erlang, spawn, [forever()]),
+ ?assertEqual([], rpc(Socket, erlang, nodes, [])),
+ ?assertEqual(ok, rpc(Socket, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), %% join - in a partition!
+
+ ?assertEqual(ok, rpc(Socket, pg, leave, [?FUNCTION_NAME, '$invisible', RemoteOldPid])),
+ ?assertEqual(ok, rpc(Socket, pg, join, [?FUNCTION_NAME, '$visible', RemoteOldPid])),
+ ?assertEqual([RemoteOldPid], rpc(Socket, pg, get_local_members, [?FUNCTION_NAME, '$visible'])),
+ %% join locally too
+ LocalPid = erlang:spawn(forever()),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, LocalPid)),
+
+ ?assertNot(lists:member(Peer, nodes())), %% should be no nodes in the cluster
+
+ pong = net_adm:ping(Peer),
+ %% now ensure sync happened
+ Pids = lists:sort([RemotePid, LocalPid]),
+ sync({?FUNCTION_NAME, Peer}),
+ ?assertEqual(Pids, lists:sort(rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME]))),
+ ?assertEqual([RemoteOldPid], pg:get_members(?FUNCTION_NAME, '$visible')),
+ stop_node(Peer, Socket),
+ ok.
+
+trisplit(Config) when is_list(Config) ->
+ {Peer, Socket1} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ _PeerPid1 = erlang:spawn(Peer, forever()),
+ PeerPid2 = erlang:spawn(Peer, forever()),
+ ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, three, PeerPid2])),
+ net_kernel:disconnect(Peer),
+ ?assertEqual(true, net_kernel:connect_node(Peer)),
+ ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, one, PeerPid2])),
+ %% now ensure sync happened
+ {Peer2, Socket2} = spawn_node(?FUNCTION_NAME, trisplit_second),
+ ?assertEqual(true, rpc:call(Peer2, net_kernel, connect_node, [Peer])),
+ ?assertEqual(lists:sort([node(), Peer]), lists:sort(rpc:call(Peer2, erlang, nodes, []))),
+ sync({?FUNCTION_NAME, Peer2}),
+ ?assertEqual([PeerPid2], rpc:call(Peer2, pg, get_members, [?FUNCTION_NAME, one])),
+ stop_node(Peer, Socket1),
+ stop_node(Peer2, Socket2),
+ ok.
+
+foursplit(Config) when is_list(Config) ->
+ Pid = erlang:spawn(forever()),
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, one, Pid)),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, two, Pid)),
+ PeerPid1 = spawn(Peer, forever()),
+ ?assertEqual(ok, pg:leave(?FUNCTION_NAME, one, Pid)),
+ ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, three, Pid)),
+ net_kernel:disconnect(Peer),
+ ?assertEqual(ok, rpc(Socket, ?MODULE, stop_proc, [PeerPid1])),
+ ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, three, Pid)),
+ ?assertEqual(true, net_kernel:connect_node(Peer)),
+ ?assertEqual([], pg:get_members(?FUNCTION_NAME, one)),
+ ?assertEqual([], rpc(Socket, pg, get_members, [?FUNCTION_NAME, one])),
+ stop_node(Peer, Socket),
+ ok.
+
+exchange(Config) when is_list(Config) ->
+ {Peer1, Socket1} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ {Peer2, Socket2} = spawn_node(?FUNCTION_NAME, exchange_second),
+ Pids10 = [rpc(Socket1, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)],
+ Pids2 = [rpc(Socket2, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)],
+ Pids11 = [rpc(Socket1, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)],
+ %% kill first 3 pids from node1
+ {PidsToKill, Pids1} = lists:split(3, Pids10),
+
+ ?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pids10])),
+ sync({?FUNCTION_NAME, Peer1}),
+ ?assertEqual(lists:sort(Pids10), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
+ [rpc(Socket1, ?MODULE, stop_proc, [Pid]) || Pid <- PidsToKill],
+ sync(?FUNCTION_NAME),
+ sync({?FUNCTION_NAME, Peer1}),
+
+ Pids = lists:sort(Pids1 ++ Pids2 ++ Pids11),
+ ?assert(lists:all(fun erlang:is_pid/1, Pids)),
+
+ net_kernel:disconnect(Peer1),
+ net_kernel:disconnect(Peer2),
+
+ sync(?FUNCTION_NAME),
+ ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
+
+ [?assertEqual(ok, rpc(Socket2, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid])) || Pid <- Pids2],
+ [?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, second, Pid])) || Pid <- Pids11],
+ ?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, third, Pids11])),
+ %% rejoin
+ ?assertEqual(true, net_kernel:connect_node(Peer1)),
+ ?assertEqual(true, net_kernel:connect_node(Peer2)),
+ %% need to sleep longer to ensure both nodes made the exchange
+ sync(?FUNCTION_NAME),
+ sync({?FUNCTION_NAME, Peer1}),
+ sync({?FUNCTION_NAME, Peer2}),
+ ?assertEqual(Pids, lists:sort(pg:get_members(?FUNCTION_NAME, second) ++ pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
+ ?assertEqual(lists:sort(Pids11), lists:sort(pg:get_members(?FUNCTION_NAME, third))),
+
+ {Left, Stay} = lists:split(3, Pids11),
+ ?assertEqual(ok, rpc(Socket1, pg, leave, [?FUNCTION_NAME, third, Left])),
+ sync({?FUNCTION_NAME, Peer1}),
+ sync(?FUNCTION_NAME),
+ ?assertEqual(lists:sort(Stay), lists:sort(pg:get_members(?FUNCTION_NAME, third))),
+ ?assertEqual(not_joined, rpc(Socket1, pg, leave, [?FUNCTION_NAME, left, Stay])),
+ ?assertEqual(ok, rpc(Socket1, pg, leave, [?FUNCTION_NAME, third, Stay])),
+ sync({?FUNCTION_NAME, Peer1}),
+ sync(?FUNCTION_NAME),
+ ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, third))),
+ sync({?FUNCTION_NAME, Peer1}),
+ sync(?FUNCTION_NAME),
+
+ stop_node(Peer1, Socket1),
+ stop_node(Peer2, Socket2),
+ ok.
+
+nolocal(Config) when is_list(Config) ->
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ RemotePid = spawn(Peer, forever()),
+ ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
+ ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
+ ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ stop_node(Peer, Socket),
+ ok.
+
+double(Config) when is_list(Config) ->
+ Pid = erlang:spawn(forever()),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [Pid])),
+ ?assertEqual([Pid, Pid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ sync(?FUNCTION_NAME),
+ sync({?FUNCTION_NAME, Peer}),
+ ?assertEqual([Pid, Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
+ stop_node(Peer, Socket),
+ ok.
+
+scope_restart(Config) when is_list(Config) ->
+ Pid = erlang:spawn(forever()),
+ ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [Pid, Pid])),
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ RemotePid = spawn(Peer, forever()),
+ ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
+ sync({?FUNCTION_NAME, Peer}),
+ ?assertEqual(lists:sort([RemotePid, Pid, Pid]), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
+ %% stop scope locally, and restart
+ gen_server:stop(?FUNCTION_NAME),
+ pg:start(?FUNCTION_NAME),
+ %% ensure remote pids joined, local are missing
+ sync(?FUNCTION_NAME),
+ sync({?FUNCTION_NAME, Peer}),
+ sync(?FUNCTION_NAME),
+ ?assertEqual([RemotePid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
+ stop_node(Peer, Socket),
+ ok.
+
+missing_scope_join(Config) when is_list(Config) ->
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ ?assertEqual(ok, rpc:call(Peer, gen_server, stop, [?FUNCTION_NAME])),
+ RemotePid = spawn(Peer, forever()),
+ ?assertMatch({badrpc, {'EXIT', {noproc, _}}}, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
+ ?assertMatch({badrpc, {'EXIT', {noproc, _}}}, rpc:call(Peer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
+ stop_node(Peer, Socket),
+ ok.
+
+disconnected_start(Config) when is_list(Config) ->
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ net_kernel:disconnect(Peer),
+ ?assertEqual(ok, rpc(Socket, gen_server, stop, [?FUNCTION_NAME])),
+ ?assertMatch({ok, _Pid}, rpc(Socket, pg, start,[?FUNCTION_NAME])),
+ ?assertEqual(ok, rpc(Socket, gen_server, stop, [?FUNCTION_NAME])),
+ RemotePid = rpc(Socket, erlang, spawn, [forever()]),
+ ?assert(is_pid(RemotePid)),
+ stop_node(Peer, Socket),
+ ok.
+
+forced_sync() ->
+ [{doc, "This test was added when lookup_element was erroneously used instead of lookup, crashing pg with badmatch, and it tests rare out-of-order sync operations"}].
+
+forced_sync(Config) when is_list(Config) ->
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ Pid = erlang:spawn(forever()),
+ RemotePid = spawn(Peer, forever()),
+ Expected = lists:sort([Pid, RemotePid]),
+ pg:join(?FUNCTION_NAME, one, Pid),
+
+ ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, one, RemotePid])),
+ RemoteScopePid = rpc:call(Peer, erlang, whereis, [?FUNCTION_NAME]),
+ ?assert(is_pid(RemoteScopePid)),
+ %% hohoho, partition!
+ net_kernel:disconnect(Peer),
+ ?assertEqual(true, net_kernel:connect_node(Peer)),
+ %% now ensure sync happened
+ sync({?FUNCTION_NAME, Peer}),
+ sync(?FUNCTION_NAME),
+ ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))),
+ %% WARNING: this code uses pg as white-box, exploiting internals,
+ %% only to simulate broken 'sync'
+ %% Fake Groups: one should disappear, one should be replaced, one stays
+ %% This tests handle_sync function.
+ FakeGroups = [{one, [RemotePid, RemotePid]}, {?FUNCTION_NAME, [RemotePid, RemotePid]}],
+ gen_server:cast(?FUNCTION_NAME, {sync, RemoteScopePid, FakeGroups}),
+ %% ensure it is broken well enough
+ sync(?FUNCTION_NAME),
+ ?assertEqual(lists:sort([RemotePid, RemotePid]), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
+ ?assertEqual(lists:sort([RemotePid, RemotePid, Pid]), lists:sort(pg:get_members(?FUNCTION_NAME, one))),
+ %% simulate force-sync via 'discover' - ask peer to send sync to us
+ {?FUNCTION_NAME, Peer} ! {discover, whereis(?FUNCTION_NAME)},
+ sync({?FUNCTION_NAME, Peer}),
+ sync(?FUNCTION_NAME),
+ ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))),
+ ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
+ %% and simulate extra sync
+ sync({?FUNCTION_NAME, Peer}),
+ sync(?FUNCTION_NAME),
+ ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))),
+
+ stop_node(Peer, Socket),
+ ok.
+
+group_leave(Config) when is_list(Config) ->
+ {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
+ RemotePid = erlang:spawn(Peer, forever()),
+ Total = lists:duplicate(16, RemotePid),
+ {Left, Remain} = lists:split(4, Total),
+ %% join 16 times!
+ ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, two, Total])),
+ ?assertEqual(ok, rpc:call(Peer, pg, leave, [?FUNCTION_NAME, two, Left])),
+
+ sync({?FUNCTION_NAME, Peer}),
+ sync(?FUNCTION_NAME),
+ ?assertEqual(Remain, pg:get_members(?FUNCTION_NAME, two)),
+ stop_node(Peer, Socket),
+ sync(?FUNCTION_NAME),
+ ?assertEqual([], pg:get_members(?FUNCTION_NAME, two)),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Test Helpers - start/stop additional Erlang nodes
+
+sync(GS) ->
+ _ = sys:log(GS, get).
+
+-define (LOCALHOST, {127, 0, 0, 1}).
+
+%% @doc Kills process Pid and waits for it to exit using monitor,
+%% and yields after (for 1 ms).
+-spec stop_proc(pid()) -> ok.
+stop_proc(Pid) ->
+ monitor(process, Pid),
+ erlang:exit(Pid, kill),
+ receive
+ {'DOWN', _MRef, process, Pid, _Info} ->
+ timer:sleep(1)
+ end.
+
+%% @doc Executes remote call on the node via TCP socket
+%% Used when dist connection is not available, or
+%% when it's undesirable to use one.
+-spec rpc(gen_tcp:socket(), module(), atom(), [term()]) -> term().
+rpc(Sock, M, F, A) ->
+ ok = gen_tcp:send(Sock, term_to_binary({call, M, F, A})),
+ inet:setopts(Sock, [{active, once}]),
+ receive
+ {tcp, Sock, Data} ->
+ case binary_to_term(Data) of
+ {ok, Val} ->
+ Val;
+ {error, Error} ->
+ {badrpc, Error}
+ end;
+ {tcp_closed, Sock} ->
+ error(closed)
+ end.
+
+%% @doc starts peer node on this host.
+%% Returns spawned node name, and a gen_tcp socket to talk to it using ?MODULE:rpc.
+-spec spawn_node(Scope :: atom(), Node :: atom()) -> {node(), gen_tcp:socket()}.
+spawn_node(Scope, Name) ->
+ Self = self(),
+ Controller = erlang:spawn(?MODULE, controller, [Name, Scope, Self]),
+ receive
+ {'$node_started', Node, Port} ->
+ {ok, Socket} = gen_tcp:connect(?LOCALHOST, Port, [{active, false}, {mode, binary}, {packet, 4}]),
+ Controller ! {socket, Socket},
+ {Node, Socket};
+ Other ->
+ error({start_node, Name, Other})
+ after 60000 ->
+ error({start_node, Name, timeout})
+ end.
+
+%% @private
+-spec controller(atom(), atom(), pid()) -> ok.
+controller(Name, Scope, Self) ->
+ Pa = filename:dirname(code:which(?MODULE)),
+ Pa2 = filename:dirname(code:which(pg)),
+ Args = lists:concat(["-setcookie ", erlang:get_cookie(),
+ "-connect_all false -kernel dist_auto_connect never -noshell -pa ", Pa, " -pa ", Pa2]),
+ {ok, Node} = test_server:start_node(Name, peer, [{args, Args}]),
+ case rpc:call(Node, ?MODULE, control, [Scope], 5000) of
+ {badrpc, nodedown} ->
+ Self ! {badrpc, Node},
+ ok;
+ {Port, _PgPid} ->
+ Self ! {'$node_started', Node, Port},
+ controller_wait()
+ end.
+
+controller_wait() ->
+ Port =
+ receive
+ {socket, Port0} ->
+ Port0
+ end,
+ MRef = monitor(port, Port),
+ receive
+ {'DOWN', MRef, port, Port, _Info} ->
+ ok
+ end.
+
+%% @doc Stops the node previously started with spawn_node,
+%% and also closes the RPC socket.
+-spec stop_node(node(), gen_tcp:socket()) -> true.
+stop_node(Node, Socket) when Node =/= node() ->
+ true = test_server:stop_node(Node),
+ Socket =/= undefined andalso gen_tcp:close(Socket),
+ true.
+
+forever() ->
+ fun() -> receive after infinity -> ok end end.
+
+
+-spec control(Scope :: atom()) -> {Port :: integer(), pid()}.
+control(Scope) ->
+ Control = self(),
+ erlang:spawn(fun () -> server(Control, Scope) end),
+ receive
+ {port, Port, PgPid} ->
+ {Port, PgPid};
+ Other ->
+ error({error, Other})
+ end.
+
+server(Control, Scope) ->
+ try
+ {ok, Pid} = if Scope =:= undefined -> {ok, undefined}; true -> pg:start(Scope) end,
+ {ok, Listen} = gen_tcp:listen(0, [{mode, binary}, {packet, 4}, {ip, ?LOCALHOST}]),
+ {ok, Port} = inet:port(Listen),
+ Control ! {port, Port, Pid},
+ {ok, Sock} = gen_tcp:accept(Listen),
+ server_loop(Sock)
+ catch
+ Class:Reason:Stack ->
+ Control ! {error, {Class, Reason, Stack}}
+ end.
+
+server_loop(Sock) ->
+ inet:setopts(Sock, [{active, once}]),
+ receive
+ {tcp, Sock, Data} ->
+ {call, M, F, A} = binary_to_term(Data),
+ Ret =
+ try
+ erlang:apply(M, F, A) of
+ Res ->
+ {ok, Res}
+ catch
+ exit:Reason ->
+ {error, {'EXIT', Reason}};
+ error:Reason ->
+ {error, {'EXIT', Reason}}
+ end,
+ ok = gen_tcp:send(Sock, term_to_binary(Ret)),
+ server_loop(Sock);
+ {tcp_closed, Sock} ->
+ erlang:halt(1)
+ end.
diff --git a/lib/runtime_tools/src/observer_backend.erl b/lib/runtime_tools/src/observer_backend.erl
index f8eb380c02..e38757b939 100644
--- a/lib/runtime_tools/src/observer_backend.erl
+++ b/lib/runtime_tools/src/observer_backend.erl
@@ -766,6 +766,7 @@ sys_tables() ->
mnesia_gvar, mnesia_stats,
% mnesia_transient_decision,
pg2_table,
+ pg,
queue,
schema,
shell_records,
@@ -777,7 +778,7 @@ sys_tables() ->
sys_processes() ->
[auth, code_server, global_name_server, inet_db,
- mnesia_recover, net_kernel, timer_server, wxe_master].
+ mnesia_recover, net_kernel, pg, timer_server, wxe_master].
mnesia_tables() ->
[ir_AliasDef, ir_ArrayDef, ir_AttributeDef, ir_ConstantDef,
diff --git a/system/doc/design_principles/distributed_applications.xml b/system/doc/design_principles/distributed_applications.xml
index a1a0149eb5..451c90a8dc 100644
--- a/system/doc/design_principles/distributed_applications.xml
+++ b/system/doc/design_principles/distributed_applications.xml
@@ -45,7 +45,7 @@
addressing mechanism is required to ensure that it can be
addressed by other applications, regardless on which node it
currently executes. This issue is not addressed here, but the
- <c>global</c> or <c>pg2</c> modules in Kernel
+ <c>global</c> or <c>pg</c> modules in Kernel
can be used for this purpose.</p>
</section>
diff --git a/system/doc/tutorial/distribution.xml b/system/doc/tutorial/distribution.xml
index b489410841..b337dc59b1 100644
--- a/system/doc/tutorial/distribution.xml
+++ b/system/doc/tutorial/distribution.xml
@@ -59,7 +59,7 @@
<item>global_group - Grouping nodes to global name registration groups.</item>
<item>net_adm - Various net administration routines.</item>
<item>net_kernel - Networking kernel.</item>
- <item>pg2 - Distributed named process groups.</item>
+ <item>pg - Distributed named process groups.</item>
<item>pool - Load distribution facility.</item>
<item>slave - Functions for starting and controlling slave nodes.</item>
</list>
diff --git a/system/doc/tutorial/overview.xml b/system/doc/tutorial/overview.xml
index bd652b1e4b..7f38e806d2 100644
--- a/system/doc/tutorial/overview.xml
+++ b/system/doc/tutorial/overview.xml
@@ -64,7 +64,7 @@
(describes the BIFs)</item>
<item><seealso marker="kernel:global">global</seealso> manual page in Kernel</item>
<item><seealso marker="kernel:net_adm">net_adm</seealso> manual page in Kernel</item>
- <item><seealso marker="kernel:pg2">pg2</seealso> manual page in Kernel</item>
+ <item><seealso marker="kernel:pg">pg</seealso> manual page in Kernel</item>
<item><seealso marker="kernel:rpc">rpc</seealso> manual page in Kernel</item>
<item><seealso marker="stdlib:pool">pool</seealso> manual page in STDLIB</item>
<item><seealso marker="stdlib:slave">slave</seealso> manual page in STDLIB</item>