From 9279f3ce3e42444191208b8e6f75377b2895adf1 Mon Sep 17 00:00:00 2001 From: Maxim Fedorov Date: Sat, 25 Jan 2020 19:05:41 -0800 Subject: pg: distributed named process groups Replacement for pg2 module. Differences (compared to pg2): * non-existent and empty group treated the same (empty list of pids), thus create/1 and delete/1 have no effect (and not implemented). which_groups() return only non-empty groups * no cluster lock required, and no dependency on global * all join/leave operations require local process (it's not possible to join a process from a different node) * multi-join: join/leave several processes with a single call Empty groups are not supported: unlike a process, group does not have originating node. So it's possible that during net split one node deletes the group, that still exists for another partition. pg2 will re-create deleted group as soon as net split converges, which is quite unexpected. Process groups can be organised into multiple scopes. Scopes are completely independent of each other. A process may join any number of groups in any number of scopes. Scopes are designed to decouple single mesh into a set of overlay networks, reducing amount of traffic required to propagate group membership information. --- lib/kernel/doc/src/Makefile | 1 + lib/kernel/doc/src/kernel_app.xml | 10 + lib/kernel/doc/src/pg.xml | 189 +++++++ lib/kernel/doc/src/ref_man.xml | 1 + lib/kernel/doc/src/specs.xml | 1 + lib/kernel/src/Makefile | 1 + lib/kernel/src/kernel.app.src | 2 + lib/kernel/src/kernel.erl | 19 +- lib/kernel/src/pg.erl | 507 +++++++++++++++++ lib/kernel/test/Makefile | 1 + lib/kernel/test/pg_SUITE.erl | 619 +++++++++++++++++++++ lib/runtime_tools/src/observer_backend.erl | 3 +- .../design_principles/distributed_applications.xml | 2 +- system/doc/tutorial/distribution.xml | 2 +- system/doc/tutorial/overview.xml | 2 +- 15 files changed, 1353 insertions(+), 7 deletions(-) create mode 100644 lib/kernel/doc/src/pg.xml create mode 100644 lib/kernel/src/pg.erl create mode 100644 lib/kernel/test/pg_SUITE.erl diff --git a/lib/kernel/doc/src/Makefile b/lib/kernel/doc/src/Makefile index fe3dc9dab5..7c0f5b79be 100644 --- a/lib/kernel/doc/src/Makefile +++ b/lib/kernel/doc/src/Makefile @@ -67,6 +67,7 @@ XML_REF3_FILES = application.xml \ net_adm.xml \ net_kernel.xml \ os.xml \ + pg.xml \ pg2.xml \ rpc.xml \ seq_trace.xml \ diff --git a/lib/kernel/doc/src/kernel_app.xml b/lib/kernel/doc/src/kernel_app.xml index 7f9609d5c1..7ad3d15cd6 100644 --- a/lib/kernel/doc/src/kernel_app.xml +++ b/lib/kernel/doc/src/kernel_app.xml @@ -414,6 +414,15 @@ MaxT = TickTime + TickTime / 4 using this service.

Defaults to false.

+ start_pg = true | false + + +

Starts the default pg scope server (see + pg(3)) if + the parameter is true. This parameter is to be set to + true in an embedded system that uses this service.

+

Defaults to false.

+
start_pg2 = true | false @@ -556,6 +565,7 @@ erl -kernel logger '[{handler,default,logger_std_h,#{formatter=>{logger_formatte logger(3), net_kernel(3), os(3), + pg(3), pg2(3), rpc(3), seq_trace(3), diff --git a/lib/kernel/doc/src/pg.xml b/lib/kernel/doc/src/pg.xml new file mode 100644 index 0000000000..359fbcf72a --- /dev/null +++ b/lib/kernel/doc/src/pg.xml @@ -0,0 +1,189 @@ + + + + + + +
+ + 20202020 + Maxim Fedorov, WhatsApp Inc. + + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + + + pg + maximfca@gmail.com + + + + + + A + pg.xml +
+ pg + Distributed named process groups. + +

This module implements process groups. A message can be sent + to one, some, or all group members.

+

A group of processes can be accessed by a common name. For + example, if there is a group named foobar, there can be a + set of processes (which can be located on different nodes) that + are all members of the group foobar. There are no special + functions for sending a message to the group. Instead, client + functions are to be written with the functions + get_members/1 and + get_local_members/1 + to determine which processes are members of the group. + Then the message can be sent to one or more group members.

+

If a member terminates, it is automatically removed from the group.

+ +

A process may join multiple groups. It may join the same group multiple times. + It is only allowed to join processes running on local node. +

+ +

Process Groups implement strong eventual consistency. + Unlike pg2, that provides + strong ordering guarantees, Process Groups membership view may temporarily + diverge. For example, when processes on node1 and node2 + join concurrently, node3 and node4 may receive updates in + a different order.

+ +

Membership view is not transitive. If node1 is not directly + connected to node2, they will not see each other groups. But if + both are connected to node3, node3 will have the full view.

+ +

Groups are automatically created when any process joins, + and are removed when all processes leave the group. Non-existing group is + considered empty (containing no processes).

+ +

Process groups can be organised into multiple scopes. Scopes are + completely independent of each other. A process may join any + number of groups in any number of scopes. Scopes are designed to + decouple single mesh into a set of overlay networks, reducing + amount of traffic required to propagate group membership + information. Default scope pg is started automatically + when kernel(6) + is configured to do so. +

+ +

+ Scope name is used to register process locally, and to name an ETS table. + If there is another process registered under this name, or another ETS table + exists, scope fails to start.

+

Local membership is not preserved if scope process exits and + restarts. This behaviour is different from + pg2, that recovers + local membership from remote nodes. +

+ +
+ + + + +

The identifier of a process group.

+
+
+ + + + + + Start the default pg scope. + +

Starts the default pg scope within supervision tree. + Kernel may be configured to do it automatically, see + kernel(6) + configuration manual.

+
+
+ + + + + Start additional scope. + +

Starts additional scope.

+
+
+ + + + + Join a process or a list of processes to a group. + +

Joins single process or multiple processes to the + group Name. A process can join a group many times and + must then leave the group the same number of times.

+

PidOrPids may contain the same process multiple times.

+
+
+ + + + + Make a process leave a group. + +

Makes the process PidOrPids leave the group Name. + If the process is not a member of the group, not_joined is + returned.

+

When list of processes is passed as PidOrPids, function + returns not_joined only when all processes of the list + are not joined.

+
+
+ + + + + Return all local processes in a group. + +

Returns all processes running on the local node in the + group Name. Processes are returned in no specific order. + This function is optimised for speed. +

+
+
+ + + + + Return all processes in a group. + +

Returns all processes in the group Name. + Processes are returned in no specific order. + This function is optimised for speed.

+
+
+ + + + + Return a list of all known groups. + +

Returns a list of all known groups.

+
+
+ +
+ +
+ See Also +

kernel(6)

+
+
+ diff --git a/lib/kernel/doc/src/ref_man.xml b/lib/kernel/doc/src/ref_man.xml index 9df51dee22..2e3a749949 100644 --- a/lib/kernel/doc/src/ref_man.xml +++ b/lib/kernel/doc/src/ref_man.xml @@ -64,6 +64,7 @@ + diff --git a/lib/kernel/doc/src/specs.xml b/lib/kernel/doc/src/specs.xml index 9e258910db..27bba6e78f 100644 --- a/lib/kernel/doc/src/specs.xml +++ b/lib/kernel/doc/src/specs.xml @@ -30,6 +30,7 @@ + diff --git a/lib/kernel/src/Makefile b/lib/kernel/src/Makefile index 2d2b84c206..3b93d84e02 100644 --- a/lib/kernel/src/Makefile +++ b/lib/kernel/src/Makefile @@ -128,6 +128,7 @@ MODULES = \ net_adm \ net_kernel \ os \ + pg \ pg2 \ ram_file \ rpc \ diff --git a/lib/kernel/src/kernel.app.src b/lib/kernel/src/kernel.app.src index 8456d2a640..08ad4ed7da 100644 --- a/lib/kernel/src/kernel.app.src +++ b/lib/kernel/src/kernel.app.src @@ -103,6 +103,7 @@ inet_tcp, inet_udp, inet_sctp, + pg, pg2, raw_file_io, raw_file_io_compressed, @@ -143,6 +144,7 @@ ddll_server, erl_epmd, inet_db, + pg, pg2]}, {applications, []}, {env, [{logger_level, notice}, diff --git a/lib/kernel/src/kernel.erl b/lib/kernel/src/kernel.erl index 8877ceea8e..83f3fbecd5 100644 --- a/lib/kernel/src/kernel.erl +++ b/lib/kernel/src/kernel.erl @@ -64,7 +64,7 @@ config_change(Changed, New, Removed) -> %%% (file,code, | erl_dist (A)| | safe_sup (1)| %%% rpc, ...) ------------- ------------- %%% | | -%%% (net_kernel, (disk_log, pg2, +%%% (net_kernel, (disk_log, pg, %%% auth, ...) ...) %%% %%% The rectangular boxes are supervisors. All supervisors except @@ -180,7 +180,7 @@ init(safe) -> Boot = start_boot_server(), DiskLog = start_disk_log(), - Pg2 = start_pg2(), + Pg = start_pg2() ++ start_pg(), %% Run the on_load handlers for all modules that have been %% loaded so far. Running them at this point means that @@ -188,7 +188,7 @@ init(safe) -> %% (and in particular call code:priv_dir/1 or code:lib_dir/1). init:run_on_load_handlers(), - {ok, {SupFlags, Boot ++ DiskLog ++ Pg2}}. + {ok, {SupFlags, Boot ++ DiskLog ++ Pg}}. start_distribution() -> Rpc = #{id => rex, @@ -279,6 +279,19 @@ start_disk_log() -> [] end. +start_pg() -> + case application:get_env(kernel, start_pg) of + {ok, true} -> + [#{id => pg, + start => {pg, start_link, []}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [pg]}]; + _ -> + [] + end. + start_pg2() -> case application:get_env(kernel, start_pg2) of {ok, true} -> diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl new file mode 100644 index 0000000000..0668dd1f79 --- /dev/null +++ b/lib/kernel/src/pg.erl @@ -0,0 +1,507 @@ +%% +%% +%% Copyright WhatsApp Inc. and its affiliates. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% +%%------------------------------------------------------------------- +%% +%% @author Maxim Fedorov +%% Process Groups with eventually consistent membership. +%% +%% Differences (compared to pg2): +%% * non-existent and empty group treated the same (empty list of pids), +%% thus create/1 and delete/1 have no effect (and not implemented). +%% which_groups() return only non-empty groups +%% * no cluster lock required, and no dependency on global +%% * all join/leave operations require local process (it's not possible to join +%% a process from a different node) +%% * multi-join: join/leave several processes with a single call +%% +%% Why empty groups are not supported: +%% Unlike a process, group does not have originating node. So it's possible +%% that during net split one node deletes the group, that still exists for +%% another partition. pg2 will recover the group, as soon as net +%% split converges, which is quite unexpected. +%% +%% Exchange protocol: +%% * when pg process starts, it broadcasts +%% 'discover' message to all nodes in the cluster +%% * when pg server receives 'discover', it responds with 'sync' message +%% containing list of groups with all local processes, and starts to +%% monitor process that sent 'discover' message (assuming it is a part +%% of an overlay network) +%% * every pg process monitors 'nodeup' messages to attempt discovery for +%% nodes that are (re)joining the cluster +%% +%% Leave/join operations: +%% * processes joining the group are monitored on the local node +%% * when process exits (without leaving groups prior to exit), local +%% instance of pg scoped process detects this and sends 'leave' to +%% all nodes in an overlay network (no remote monitoring done) +%% * all leave/join operations are serialised through pg server process +%% +-module(pg). + +%% API: default scope +-export([ + start_link/0, + + join/2, + leave/2, + get_members/1, + get_local_members/1, + which_groups/0, + which_local_groups/0 +]). + +%% Scoped API: overlay networks +-export([ + start/1, + start_link/1, + + join/3, + leave/3, + get_members/2, + get_local_members/2, + which_groups/1, + which_local_groups/1 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +%% Types +-type group() :: any(). + +%% Default scope started by kernel app +-define(DEFAULT_SCOPE, ?MODULE). + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server and links it to calling process. +%% Uses default scope, which is the same as as the module name. +-spec start_link() -> {ok, pid()} | {error, any()}. +start_link() -> + start_link(?DEFAULT_SCOPE). + +%% @doc +%% Starts the server outside of supervision hierarchy. +-spec start(Scope :: atom()) -> {ok, pid()} | {error, any()}. +start(Scope) when is_atom(Scope) -> + gen_server:start({local, Scope}, ?MODULE, [Scope], []). + +%% @doc +%% Starts the server and links it to calling process. +%% Scope name is supplied. +-spec start_link(Scope :: atom()) -> {ok, pid()} | {error, any()}. +start_link(Scope) when is_atom(Scope) -> + gen_server:start_link({local, Scope}, ?MODULE, [Scope], []). + +%%-------------------------------------------------------------------- +%% @doc +%% Joins a single process +%% Group is created automatically. +%% Process must be local to this node. +-spec join(Group :: group(), PidOrPids :: pid() | [pid()]) -> ok. +join(Group, PidOrPids) -> + join(?DEFAULT_SCOPE, Group, PidOrPids). + +-spec join(Scope :: atom(), Group :: group(), PidOrPids :: pid() | [pid()]) -> ok. +join(Scope, Group, PidOrPids) -> + Node = node(), + is_list(PidOrPids) andalso [error({nolocal, Pid}) || Pid <- PidOrPids, node(Pid) =/= Node orelse not is_pid(Pid)], + gen_server:call(Scope, {join_local, Group, PidOrPids}, infinity). + +%%-------------------------------------------------------------------- +%% @doc +%% Single process leaving the group. +%% Process must be local to this node. +-spec leave(Group :: group(), PidOrPids :: pid() | [pid()]) -> ok. +leave(Group, PidOrPids) -> + leave(?DEFAULT_SCOPE, Group, PidOrPids). + +-spec leave(Scope :: atom(), Group :: group(), Pid :: pid() | [pid()]) -> ok | not_joined. +leave(Scope, Group, PidOrPids) -> + Node = node(), + is_list(PidOrPids) andalso [error({nolocal, Pid}) || Pid <- PidOrPids, node(Pid) =/= Node orelse not is_pid(Pid)], + gen_server:call(Scope, {leave_local, Group, PidOrPids}, infinity). + +%%-------------------------------------------------------------------- +%% @doc +%% Returns all processes in a group +-spec get_members(Group :: group()) -> [pid()]. +get_members(Group) -> + get_members(?DEFAULT_SCOPE, Group). + +-spec get_members(Scope :: atom(), Group :: group()) -> [pid()]. +get_members(Scope, Group) -> + try + ets:lookup_element(Scope, Group, 2) + catch + error:badarg -> + [] + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Returns processes in a group, running on local node. +-spec get_local_members(Group :: group()) -> [pid()]. +get_local_members(Group) -> + get_local_members(?DEFAULT_SCOPE, Group). + +-spec get_local_members(Scope :: atom(), Group :: group()) -> [pid()]. +get_local_members(Scope, Group) -> + try + ets:lookup_element(Scope, Group, 3) + catch + error:badarg -> + [] + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Returns a list of all known groups. +-spec which_groups() -> [Group :: group()]. +which_groups() -> + which_groups(?DEFAULT_SCOPE). + +-spec which_groups(Scope :: atom()) -> [Group :: group()]. +which_groups(Scope) when is_atom(Scope) -> + [G || [G] <- ets:match(Scope, {'$1', '_', '_'})]. + +%%-------------------------------------------------------------------- +%% @private +%% Returns a list of groups that have any local processes joined. +-spec which_local_groups() -> [Group :: group()]. +which_local_groups() -> + which_local_groups(?DEFAULT_SCOPE). + +-spec which_local_groups(Scope :: atom()) -> [Group :: group()]. +which_local_groups(Scope) when is_atom(Scope) -> + ets:select(Scope, [{{'$1', '_', '$2'}, [{'=/=', '$2', []}], ['$1']}]). + +%%-------------------------------------------------------------------- +%% Internal implementation + +%% gen_server implementation +-record(state, { + %% ETS table name, and also the registered process name (self()) + scope :: atom(), + %% monitored local processes and groups they joined + monitors = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}}, + %% remote node: scope process monitor and map of groups to pids for fast sync routine + nodes = #{} :: #{pid() => {reference(), #{group() => [pid()]}}} +}). + +-type state() :: #state{}. + +-spec init([Scope :: atom()]) -> {ok, state()}. +init([Scope]) -> + ok = net_kernel:monitor_nodes(true), + %% discover all nodes in the cluster + broadcast([{Scope, Node} || Node <- nodes()], {discover, self()}), + Scope = ets:new(Scope, [set, protected, named_table, {read_concurrency, true}]), + {ok, #state{scope = Scope}}. + +-spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()} + | {leave_local, Group :: group(), Pid :: pid()}, + From :: {pid(),Tag :: any()}, + State :: state()) -> {reply, ok | not_joined, state()}. + +handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) -> + NewMons = join_monitors(PidOrPids, Group, Monitors), + join_local_group(Scope, Group, PidOrPids), + broadcast(maps:keys(Nodes), {join, self(), Group, PidOrPids}), + {reply, ok, State#state{monitors = NewMons}}; + +handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) -> + case leave_monitors(PidOrPids, Group, Monitors) of + Monitors -> + {reply, not_joined, State}; + NewMons -> + leave_local_group(Scope, Group, PidOrPids), + broadcast(maps:keys(Nodes), {leave, self(), PidOrPids, [Group]}), + {reply, ok, State#state{monitors = NewMons}} + end; + +handle_call(_Request, _From, _S) -> + error(badarg). + +-spec handle_cast( + {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]}, + State :: state()) -> {noreply, state()}. + +handle_cast({sync, Peer, Groups}, #state{scope = Scope, nodes = Nodes} = State) -> + {noreply, State#state{nodes = handle_sync(Scope, Peer, Nodes, Groups)}}; + +handle_cast(_, _State) -> + error(badarg). + +-spec handle_info( + {discover, Peer :: pid()} | + {join, Peer :: pid(), group(), pid() | [pid()]} | + {leave, Peer :: pid(), pid() | [pid()], [group()]} | + {'DOWN', reference(), process, pid(), term()} | + {nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}. + +%% remote pid or several pids joining the group +handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, nodes = Nodes} = State) -> + join_remote(Scope, Group, PidOrPids), + % store remote group => pids map for fast sync operation + {MRef, RemoteGroups} = maps:get(Peer, Nodes), + NewRemoteGroups = join_remote_map(Group, PidOrPids, RemoteGroups), + {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteGroups}}}}; + +%% remote pid leaving (multiple groups at once) +handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, nodes = Nodes} = State) -> + _ = leave_remote(Scope, PidOrPids, Groups), + {MRef, RemoteMap} = maps:get(Peer, Nodes), + NewRemoteMap = lists:foldl( + fun (Group, Acc) -> + case maps:get(Group, Acc) of + PidOrPids -> + Acc; + [PidOrPids] -> + Acc; + Existing when is_pid(PidOrPids) -> + Acc#{Group => lists:delete(PidOrPids, Existing)}; + Existing -> + Acc#{Group => Existing-- PidOrPids} + end + end, RemoteMap, Groups), + {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteMap}}}}; + +%% we're being discovered, let's exchange! +handle_info({discover, Peer}, #state{scope = Scope, nodes = Nodes} = State) -> + gen_server:cast(Peer, {sync, self(), all_local_pids(Scope)}), + %% do we know who is looking for us? + case maps:is_key(Peer, Nodes) of + true -> + {noreply, State}; + false -> + MRef = monitor(process, Peer), + erlang:send(Peer, {discover, self()}, [noconnect]), + {noreply, State#state{nodes = Nodes#{Peer => {MRef, #{}}}}} + end; + +%% handle local process exit +handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) when node(Pid) =:= node() -> + case maps:take(Pid, Monitors) of + error -> + %% this can only happen when leave request and 'DOWN' are in pg queue + {noreply, State}; + {{MRef, Groups}, NewMons} -> + [leave_local_group(Scope, Group, Pid) || Group <- Groups], + %% send update to all nodes + broadcast(maps:keys(Nodes), {leave, self(), Pid, Groups}), + {noreply, State#state{monitors = NewMons}} + end; + +%% handle remote node down or leaving overlay network +handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, nodes = Nodes} = State) -> + {{MRef, RemoteMap}, NewNodes} = maps:take(Pid, Nodes), + _ = maps:map(fun (Group, Pids) -> leave_remote(Scope, Pids, [Group]) end, RemoteMap), + {noreply, State#state{nodes = NewNodes}}; + +%% nodedown: ignore, and wait for 'DOWN' signal for monitored process +handle_info({nodedown, _Node}, State) -> + {noreply, State}; + +%% nodeup: discover if remote node participates in the overlay network +handle_info({nodeup, Node}, #state{scope = Scope} = State) -> + {Scope, Node} ! {discover, self()}, + {noreply, State}; + +handle_info(_Info, _State) -> + error(badarg). + +-spec terminate(Reason :: any(), State :: state()) -> true. +terminate(_Reason, #state{scope = Scope}) -> + true = ets:delete(Scope). + +%%-------------------------------------------------------------------- +%% Internal implementation + +%% Override all knowledge of the remote node with information it sends +%% to local node. Current implementation must do the full table scan +%% to remove stale pids (just as for 'nodedown'). +handle_sync(Scope, Peer, Nodes, Groups) -> + %% can't use maps:get() because it evaluates 'default' value first, + %% and in this case monitor() call has side effect. + {MRef, RemoteGroups} = + case maps:find(Peer, Nodes) of + error -> + {monitor(process, Peer), #{}}; + {ok, MRef0} -> + MRef0 + end, + %% sync RemoteMap and transform ETS table + _ = sync_groups(Scope, RemoteGroups, Groups), + Nodes#{Peer => {MRef, maps:from_list(Groups)}}. + +sync_groups(Scope, RemoteGroups, []) -> + %% leave all missing groups + [leave_remote(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)]; +sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) -> + case maps:take(Group, RemoteGroups) of + {Pids, NewRemoteGroups} -> + sync_groups(Scope, NewRemoteGroups, Tail); + {OldPids, NewRemoteGroups} -> + [{Group, AllOldPids, LocalPids}] = ets:lookup(Scope, Group), + %% should be really rare... + AllNewPids = Pids ++ AllOldPids -- OldPids, + true = ets:insert(Scope, {Group, AllNewPids, LocalPids}), + sync_groups(Scope, NewRemoteGroups, Tail); + error -> + join_remote(Scope, Group, Pids), + sync_groups(Scope, RemoteGroups, Tail) + end. + +join_monitors(Pid, Group, Monitors) when is_pid(Pid) -> + case maps:find(Pid, Monitors) of + {ok, {MRef, Groups}} -> + maps:put(Pid, {MRef, [Group | Groups]}, Monitors); + error -> + MRef = erlang:monitor(process, Pid), + Monitors#{Pid => {MRef, [Group]}} + end; +join_monitors([], _Group, Monitors) -> + Monitors; +join_monitors([Pid | Tail], Group, Monitors) -> + join_monitors(Tail, Group, join_monitors(Pid, Group, Monitors)). + +join_local_group(Scope, Group, Pid) when is_pid(Pid) -> + case ets:lookup(Scope, Group) of + [{Group, All, Local}] -> + ets:insert(Scope, {Group, [Pid | All], [Pid | Local]}); + [] -> + ets:insert(Scope, {Group, [Pid], [Pid]}) + end; +join_local_group(Scope, Group, Pids) -> + case ets:lookup(Scope, Group) of + [{Group, All, Local}] -> + ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local}); + [] -> + ets:insert(Scope, {Group, Pids, Pids}) + end. + +join_remote(Scope, Group, Pid) when is_pid(Pid) -> + case ets:lookup(Scope, Group) of + [{Group, All, Local}] -> + ets:insert(Scope, {Group, [Pid | All], Local}); + [] -> + ets:insert(Scope, {Group, [Pid], []}) + end; +join_remote(Scope, Group, Pids) -> + case ets:lookup(Scope, Group) of + [{Group, All, Local}] -> + ets:insert(Scope, {Group, Pids ++ All, Local}); + [] -> + ets:insert(Scope, {Group, Pids, []}) + end. + +join_remote_map(Group, Pid, RemoteGroups) when is_pid(Pid) -> + maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups); +join_remote_map(Group, Pids, RemoteGroups) -> + maps:update_with(Group, fun (List) -> Pids ++ List end, Pids, RemoteGroups). + +leave_monitors(Pid, Group, Monitors) when is_pid(Pid) -> + case maps:find(Pid, Monitors) of + {ok, {MRef, [Group]}} -> + erlang:demonitor(MRef), + maps:remove(Pid, Monitors); + {ok, {MRef, Groups}} -> + case lists:member(Group, Groups) of + true -> + maps:put(Pid, {MRef, lists:delete(Group, Groups)}, Monitors); + false -> + Monitors + end; + _ -> + Monitors + end; +leave_monitors([], _Group, Monitors) -> + Monitors; +leave_monitors([Pid | Tail], Group, Monitors) -> + leave_monitors(Tail, Group, leave_monitors(Pid, Group, Monitors)). + +leave_local_group(Scope, Group, Pid) when is_pid(Pid) -> + case ets:lookup(Scope, Group) of + [{Group, [Pid], [Pid]}] -> + ets:delete(Scope, Group); + [{Group, All, Local}] -> + ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)}); + [] -> + %% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing. + true + end; +leave_local_group(Scope, Group, Pids) -> + case ets:lookup(Scope, Group) of + [{Group, All, Local}] -> + case All -- Pids of + [] -> + ets:delete(Scope, Group); + NewAll -> + ets:insert(Scope, {Group, NewAll, Local -- Pids}) + end; + [] -> + true + end. + +leave_remote(Scope, Pid, Groups) when is_pid(Pid) -> + _ = [ + case ets:lookup(Scope, Group) of + [{Group, [Pid], []}] -> + ets:delete(Scope, Group); + [{Group, All, Local}] -> + ets:insert(Scope, {Group, lists:delete(Pid, All), Local}); + [] -> + true + end || + Group <- Groups]; +leave_remote(Scope, Pids, Groups) -> + _ = [ + case ets:lookup(Scope, Group) of + [{Group, All, Local}] -> + case All -- Pids of + [] when Local =:= [] -> + ets:delete(Scope, Group); + NewAll -> + ets:insert(Scope, {Group, NewAll, Local}) + end; + [] -> + true + end || + Group <- Groups]. + +all_local_pids(Scope) -> + %% selector: ets:fun2ms(fun({N,_,L}) when L =/=[] -> {N,L}end). + ets:select(Scope, [{{'$1','_','$2'},[{'=/=','$2',[]}],[{{'$1','$2'}}]}]). + +%% Works as gen_server:abcast(), but accepts a list of processes +%% instead of nodes list. +broadcast([], _Msg) -> + ok; +broadcast([Dest | Tail], Msg) -> + %% do not use 'nosuspend', as it will lead to missing + %% join/leave messages when dist buffer is full + erlang:send(Dest, Msg, [noconnect]), + broadcast(Tail, Msg). diff --git a/lib/kernel/test/Makefile b/lib/kernel/test/Makefile index bd1590ee8f..10c55907e9 100644 --- a/lib/kernel/test/Makefile +++ b/lib/kernel/test/Makefile @@ -85,6 +85,7 @@ MODULES= \ logger_test_lib \ net_SUITE \ os_SUITE \ + pg_SUITE \ pg2_SUITE \ seq_trace_SUITE \ wrap_log_reader_SUITE \ diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl new file mode 100644 index 0000000000..bdb7abe99d --- /dev/null +++ b/lib/kernel/test/pg_SUITE.erl @@ -0,0 +1,619 @@ +%% +%% +%% Copyright WhatsApp Inc. and its affiliates. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%%------------------------------------------------------------------- +%% @author Maxim Fedorov +%% Process Groups smoke test. +-module(pg_SUITE). +-author("maximfca@gmail.com"). + +%% Test server callbacks +-export([ + suite/0, + all/0, + groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2, + stop_proc/1 +]). + +%% Test cases exports +-export([ + pg/0, pg/1, + errors/0, errors/1, + leave_exit_race/0, leave_exit_race/1, + single/0, single/1, + two/1, + thundering_herd/0, thundering_herd/1, + initial/1, + netsplit/1, + trisplit/1, + foursplit/1, + exchange/1, + nolocal/1, + double/1, + scope_restart/1, + missing_scope_join/1, + disconnected_start/1, + forced_sync/0, forced_sync/1, + group_leave/1 +]). + +-export([ + control/1, + controller/3 +]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +suite() -> + [{timetrap, {seconds, 10}}]. + +init_per_suite(Config) -> + case erlang:is_alive() of + false -> + %% verify epmd running (otherwise next call fails) + (erl_epmd:names("localhost") =:= {error, address}) andalso ([] = os:cmd("epmd -daemon")), + %% start a random node name + NodeName = list_to_atom(lists:concat([atom_to_list(?MODULE), "_", os:getpid()])), + {ok, Pid} = net_kernel:start([NodeName, shortnames]), + [{distribution, Pid} | Config]; + true -> + Config + end. + +end_per_suite(Config) -> + is_pid(proplists:get_value(distribution, Config)) andalso net_kernel:stop(). + +init_per_testcase(TestCase, Config) -> + {ok, _Pid} = pg:start_link(TestCase), + Config. + +end_per_testcase(TestCase, _Config) -> + gen_server:stop(TestCase), + ok. + +all() -> + [{group, basic}, {group, cluster}, {group, performance}]. + +groups() -> + [ + {basic, [parallel], [errors, pg, leave_exit_race, single]}, + {performance, [sequential], [thundering_herd]}, + {cluster, [parallel], [two, initial, netsplit, trisplit, foursplit, + exchange, nolocal, double, scope_restart, missing_scope_join, + disconnected_start, forced_sync, group_leave]} + ]. + +%%-------------------------------------------------------------------- +%% TEST CASES + +pg() -> + [{doc, "This test must be names pg, to stay inline with default scope"}]. + +pg(_Config) -> + ?assertNotEqual(undefined, whereis(?FUNCTION_NAME)), %% ensure scope was started + ?assertEqual(ok, pg:join(?FUNCTION_NAME, self())), + ?assertEqual([self()], pg:get_local_members(?FUNCTION_NAME)), + ?assertEqual([?FUNCTION_NAME], pg:which_groups()), + ?assertEqual([?FUNCTION_NAME], pg:which_local_groups()), + ?assertEqual(ok, pg:leave(?FUNCTION_NAME, self())), + ?assertEqual([], pg:get_members(?FUNCTION_NAME)), + ?assertEqual([], pg:which_groups(?FUNCTION_NAME)), + ?assertEqual([], pg:which_local_groups(?FUNCTION_NAME)). + +errors() -> + [{doc, "Tests that errors are handled as expected, for example pg server crashes when it needs to"}]. + +errors(_Config) -> + %% kill with 'info' and 'cast' + ?assertException(error, badarg, pg:handle_info(garbage, garbage)), + ?assertException(error, badarg, pg:handle_cast(garbage, garbage)), + %% kill with call + {ok, _Pid} = pg:start(second), + ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage, 100)). + +leave_exit_race() -> + [{doc, "Tests that pg correctly handles situation when leave and 'DOWN' messages are both in pg queue"}]. + +leave_exit_race(Config) when is_list(Config) -> + process_flag(priority, high), + [ + begin + Pid = spawn(fun () -> ok end), + pg:join(leave_exit_race, test, Pid), + pg:leave(leave_exit_race, test, Pid) + end + || _ <- lists:seq(1, 100)]. + +single() -> + [{doc, "Tests single node groups"}, {timetrap, {seconds, 5}}]. + +single(Config) when is_list(Config) -> + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [self(), self()])), + ?assertEqual([self(), self(), self()], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + ?assertEqual([self(), self(), self()], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, '$missing$', self())), + ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, [self(), self()])), + ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, self())), + ?assertEqual([], pg:which_groups(?FUNCTION_NAME)), + ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + %% double + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())), + Pid = erlang:spawn(forever()), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), + Expected = lists:sort([Pid, self()]), + ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), + ?assertEqual(Expected, lists:sort(pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME))), + + stop_proc(Pid), + sync(?FUNCTION_NAME), + ?assertEqual([self()], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, self())), + ok. + +two(Config) when is_list(Config) -> + {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + Pid = erlang:spawn(forever()), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), + ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + %% first RPC must be serialised + sync({?FUNCTION_NAME, TwoPeer}), + ?assertEqual([Pid], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), + ?assertEqual([], rpc:call(TwoPeer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), + stop_proc(Pid), + %% again, must be serialised + sync(?FUNCTION_NAME), + ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + ?assertEqual([], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), + + Pid2 = erlang:spawn(TwoPeer, forever()), + Pid3 = erlang:spawn(TwoPeer, forever()), + ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid2])), + ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid3])), + %% serialise through the *other* node + sync({?FUNCTION_NAME, TwoPeer}), + ?assertEqual(lists:sort([Pid2, Pid3]), + lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), + %% stop the peer + stop_node(TwoPeer, Socket), + %% hope that 'nodedown' comes before we route our request + sync(?FUNCTION_NAME), + ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + ok. + +thundering_herd() -> + [{doc, "Thousands of overlay network nodes sending sync to us, and we time out!"}, {timetrap, {seconds, 5}}]. + +thundering_herd(Config) when is_list(Config) -> + GroupCount = 10000, + SyncCount = 2000, + %% make up a large amount of groups + [pg:join(?FUNCTION_NAME, {group, Seq}, self()) || Seq <- lists:seq(1, GroupCount)], + %% initiate a few syncs - and those are really slow... + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + PeerPid = erlang:spawn(Peer, forever()), + PeerPg = rpc:call(Peer, erlang, whereis, [?FUNCTION_NAME], 1000), + %% WARNING: code below acts for white-box! %% WARNING + FakeSync = [{{group, 1}, [PeerPid, PeerPid]}], + [gen_server:cast(?FUNCTION_NAME, {sync, PeerPg, FakeSync}) || _ <- lists:seq(1, SyncCount)], + %% next call must not timetrap, otherwise test fails + pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self()), + stop_node(Peer, Socket). + +initial(Config) when is_list(Config) -> + Pid = erlang:spawn(forever()), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), + ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + %% first RPC must be serialised + sync({?FUNCTION_NAME, Peer}), + ?assertEqual([Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), + + ?assertEqual([], rpc:call(Peer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), + stop_proc(Pid), + sync({?FUNCTION_NAME, Peer}), + ?assertEqual([], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), + stop_node(Peer, Socket), + ok. + +netsplit(Config) when is_list(Config) -> + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + ?assertEqual(Peer, rpc(Socket, erlang, node, [])), %% just to test RPC + RemoteOldPid = erlang:spawn(Peer, forever()), + ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, '$invisible', RemoteOldPid])), + %% hohoho, partition! + net_kernel:disconnect(Peer), + ?assertEqual(Peer, rpc(Socket, erlang, node, [])), %% just to ensure RPC still works + RemotePid = rpc(Socket, erlang, spawn, [forever()]), + ?assertEqual([], rpc(Socket, erlang, nodes, [])), + ?assertEqual(ok, rpc(Socket, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), %% join - in a partition! + + ?assertEqual(ok, rpc(Socket, pg, leave, [?FUNCTION_NAME, '$invisible', RemoteOldPid])), + ?assertEqual(ok, rpc(Socket, pg, join, [?FUNCTION_NAME, '$visible', RemoteOldPid])), + ?assertEqual([RemoteOldPid], rpc(Socket, pg, get_local_members, [?FUNCTION_NAME, '$visible'])), + %% join locally too + LocalPid = erlang:spawn(forever()), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, LocalPid)), + + ?assertNot(lists:member(Peer, nodes())), %% should be no nodes in the cluster + + pong = net_adm:ping(Peer), + %% now ensure sync happened + Pids = lists:sort([RemotePid, LocalPid]), + sync({?FUNCTION_NAME, Peer}), + ?assertEqual(Pids, lists:sort(rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME]))), + ?assertEqual([RemoteOldPid], pg:get_members(?FUNCTION_NAME, '$visible')), + stop_node(Peer, Socket), + ok. + +trisplit(Config) when is_list(Config) -> + {Peer, Socket1} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + _PeerPid1 = erlang:spawn(Peer, forever()), + PeerPid2 = erlang:spawn(Peer, forever()), + ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, three, PeerPid2])), + net_kernel:disconnect(Peer), + ?assertEqual(true, net_kernel:connect_node(Peer)), + ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, one, PeerPid2])), + %% now ensure sync happened + {Peer2, Socket2} = spawn_node(?FUNCTION_NAME, trisplit_second), + ?assertEqual(true, rpc:call(Peer2, net_kernel, connect_node, [Peer])), + ?assertEqual(lists:sort([node(), Peer]), lists:sort(rpc:call(Peer2, erlang, nodes, []))), + sync({?FUNCTION_NAME, Peer2}), + ?assertEqual([PeerPid2], rpc:call(Peer2, pg, get_members, [?FUNCTION_NAME, one])), + stop_node(Peer, Socket1), + stop_node(Peer2, Socket2), + ok. + +foursplit(Config) when is_list(Config) -> + Pid = erlang:spawn(forever()), + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, one, Pid)), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, two, Pid)), + PeerPid1 = spawn(Peer, forever()), + ?assertEqual(ok, pg:leave(?FUNCTION_NAME, one, Pid)), + ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, three, Pid)), + net_kernel:disconnect(Peer), + ?assertEqual(ok, rpc(Socket, ?MODULE, stop_proc, [PeerPid1])), + ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, three, Pid)), + ?assertEqual(true, net_kernel:connect_node(Peer)), + ?assertEqual([], pg:get_members(?FUNCTION_NAME, one)), + ?assertEqual([], rpc(Socket, pg, get_members, [?FUNCTION_NAME, one])), + stop_node(Peer, Socket), + ok. + +exchange(Config) when is_list(Config) -> + {Peer1, Socket1} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + {Peer2, Socket2} = spawn_node(?FUNCTION_NAME, exchange_second), + Pids10 = [rpc(Socket1, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)], + Pids2 = [rpc(Socket2, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)], + Pids11 = [rpc(Socket1, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)], + %% kill first 3 pids from node1 + {PidsToKill, Pids1} = lists:split(3, Pids10), + + ?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pids10])), + sync({?FUNCTION_NAME, Peer1}), + ?assertEqual(lists:sort(Pids10), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), + [rpc(Socket1, ?MODULE, stop_proc, [Pid]) || Pid <- PidsToKill], + sync(?FUNCTION_NAME), + sync({?FUNCTION_NAME, Peer1}), + + Pids = lists:sort(Pids1 ++ Pids2 ++ Pids11), + ?assert(lists:all(fun erlang:is_pid/1, Pids)), + + net_kernel:disconnect(Peer1), + net_kernel:disconnect(Peer2), + + sync(?FUNCTION_NAME), + ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), + + [?assertEqual(ok, rpc(Socket2, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid])) || Pid <- Pids2], + [?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, second, Pid])) || Pid <- Pids11], + ?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, third, Pids11])), + %% rejoin + ?assertEqual(true, net_kernel:connect_node(Peer1)), + ?assertEqual(true, net_kernel:connect_node(Peer2)), + %% need to sleep longer to ensure both nodes made the exchange + sync(?FUNCTION_NAME), + sync({?FUNCTION_NAME, Peer1}), + sync({?FUNCTION_NAME, Peer2}), + ?assertEqual(Pids, lists:sort(pg:get_members(?FUNCTION_NAME, second) ++ pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), + ?assertEqual(lists:sort(Pids11), lists:sort(pg:get_members(?FUNCTION_NAME, third))), + + {Left, Stay} = lists:split(3, Pids11), + ?assertEqual(ok, rpc(Socket1, pg, leave, [?FUNCTION_NAME, third, Left])), + sync({?FUNCTION_NAME, Peer1}), + sync(?FUNCTION_NAME), + ?assertEqual(lists:sort(Stay), lists:sort(pg:get_members(?FUNCTION_NAME, third))), + ?assertEqual(not_joined, rpc(Socket1, pg, leave, [?FUNCTION_NAME, left, Stay])), + ?assertEqual(ok, rpc(Socket1, pg, leave, [?FUNCTION_NAME, third, Stay])), + sync({?FUNCTION_NAME, Peer1}), + sync(?FUNCTION_NAME), + ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, third))), + sync({?FUNCTION_NAME, Peer1}), + sync(?FUNCTION_NAME), + + stop_node(Peer1, Socket1), + stop_node(Peer2, Socket2), + ok. + +nolocal(Config) when is_list(Config) -> + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + RemotePid = spawn(Peer, forever()), + ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), + ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), + ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + stop_node(Peer, Socket), + ok. + +double(Config) when is_list(Config) -> + Pid = erlang:spawn(forever()), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [Pid])), + ?assertEqual([Pid, Pid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + sync(?FUNCTION_NAME), + sync({?FUNCTION_NAME, Peer}), + ?assertEqual([Pid, Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), + stop_node(Peer, Socket), + ok. + +scope_restart(Config) when is_list(Config) -> + Pid = erlang:spawn(forever()), + ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [Pid, Pid])), + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + RemotePid = spawn(Peer, forever()), + ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), + sync({?FUNCTION_NAME, Peer}), + ?assertEqual(lists:sort([RemotePid, Pid, Pid]), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), + %% stop scope locally, and restart + gen_server:stop(?FUNCTION_NAME), + pg:start(?FUNCTION_NAME), + %% ensure remote pids joined, local are missing + sync(?FUNCTION_NAME), + sync({?FUNCTION_NAME, Peer}), + sync(?FUNCTION_NAME), + ?assertEqual([RemotePid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), + stop_node(Peer, Socket), + ok. + +missing_scope_join(Config) when is_list(Config) -> + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + ?assertEqual(ok, rpc:call(Peer, gen_server, stop, [?FUNCTION_NAME])), + RemotePid = spawn(Peer, forever()), + ?assertMatch({badrpc, {'EXIT', {noproc, _}}}, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), + ?assertMatch({badrpc, {'EXIT', {noproc, _}}}, rpc:call(Peer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), + stop_node(Peer, Socket), + ok. + +disconnected_start(Config) when is_list(Config) -> + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + net_kernel:disconnect(Peer), + ?assertEqual(ok, rpc(Socket, gen_server, stop, [?FUNCTION_NAME])), + ?assertMatch({ok, _Pid}, rpc(Socket, pg, start,[?FUNCTION_NAME])), + ?assertEqual(ok, rpc(Socket, gen_server, stop, [?FUNCTION_NAME])), + RemotePid = rpc(Socket, erlang, spawn, [forever()]), + ?assert(is_pid(RemotePid)), + stop_node(Peer, Socket), + ok. + +forced_sync() -> + [{doc, "This test was added when lookup_element was erroneously used instead of lookup, crashing pg with badmatch, and it tests rare out-of-order sync operations"}]. + +forced_sync(Config) when is_list(Config) -> + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + Pid = erlang:spawn(forever()), + RemotePid = spawn(Peer, forever()), + Expected = lists:sort([Pid, RemotePid]), + pg:join(?FUNCTION_NAME, one, Pid), + + ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, one, RemotePid])), + RemoteScopePid = rpc:call(Peer, erlang, whereis, [?FUNCTION_NAME]), + ?assert(is_pid(RemoteScopePid)), + %% hohoho, partition! + net_kernel:disconnect(Peer), + ?assertEqual(true, net_kernel:connect_node(Peer)), + %% now ensure sync happened + sync({?FUNCTION_NAME, Peer}), + sync(?FUNCTION_NAME), + ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))), + %% WARNING: this code uses pg as white-box, exploiting internals, + %% only to simulate broken 'sync' + %% Fake Groups: one should disappear, one should be replaced, one stays + %% This tests handle_sync function. + FakeGroups = [{one, [RemotePid, RemotePid]}, {?FUNCTION_NAME, [RemotePid, RemotePid]}], + gen_server:cast(?FUNCTION_NAME, {sync, RemoteScopePid, FakeGroups}), + %% ensure it is broken well enough + sync(?FUNCTION_NAME), + ?assertEqual(lists:sort([RemotePid, RemotePid]), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), + ?assertEqual(lists:sort([RemotePid, RemotePid, Pid]), lists:sort(pg:get_members(?FUNCTION_NAME, one))), + %% simulate force-sync via 'discover' - ask peer to send sync to us + {?FUNCTION_NAME, Peer} ! {discover, whereis(?FUNCTION_NAME)}, + sync({?FUNCTION_NAME, Peer}), + sync(?FUNCTION_NAME), + ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))), + ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), + %% and simulate extra sync + sync({?FUNCTION_NAME, Peer}), + sync(?FUNCTION_NAME), + ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))), + + stop_node(Peer, Socket), + ok. + +group_leave(Config) when is_list(Config) -> + {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), + RemotePid = erlang:spawn(Peer, forever()), + Total = lists:duplicate(16, RemotePid), + {Left, Remain} = lists:split(4, Total), + %% join 16 times! + ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, two, Total])), + ?assertEqual(ok, rpc:call(Peer, pg, leave, [?FUNCTION_NAME, two, Left])), + + sync({?FUNCTION_NAME, Peer}), + sync(?FUNCTION_NAME), + ?assertEqual(Remain, pg:get_members(?FUNCTION_NAME, two)), + stop_node(Peer, Socket), + sync(?FUNCTION_NAME), + ?assertEqual([], pg:get_members(?FUNCTION_NAME, two)), + ok. + +%%-------------------------------------------------------------------- +%% Test Helpers - start/stop additional Erlang nodes + +sync(GS) -> + _ = sys:log(GS, get). + +-define (LOCALHOST, {127, 0, 0, 1}). + +%% @doc Kills process Pid and waits for it to exit using monitor, +%% and yields after (for 1 ms). +-spec stop_proc(pid()) -> ok. +stop_proc(Pid) -> + monitor(process, Pid), + erlang:exit(Pid, kill), + receive + {'DOWN', _MRef, process, Pid, _Info} -> + timer:sleep(1) + end. + +%% @doc Executes remote call on the node via TCP socket +%% Used when dist connection is not available, or +%% when it's undesirable to use one. +-spec rpc(gen_tcp:socket(), module(), atom(), [term()]) -> term(). +rpc(Sock, M, F, A) -> + ok = gen_tcp:send(Sock, term_to_binary({call, M, F, A})), + inet:setopts(Sock, [{active, once}]), + receive + {tcp, Sock, Data} -> + case binary_to_term(Data) of + {ok, Val} -> + Val; + {error, Error} -> + {badrpc, Error} + end; + {tcp_closed, Sock} -> + error(closed) + end. + +%% @doc starts peer node on this host. +%% Returns spawned node name, and a gen_tcp socket to talk to it using ?MODULE:rpc. +-spec spawn_node(Scope :: atom(), Node :: atom()) -> {node(), gen_tcp:socket()}. +spawn_node(Scope, Name) -> + Self = self(), + Controller = erlang:spawn(?MODULE, controller, [Name, Scope, Self]), + receive + {'$node_started', Node, Port} -> + {ok, Socket} = gen_tcp:connect(?LOCALHOST, Port, [{active, false}, {mode, binary}, {packet, 4}]), + Controller ! {socket, Socket}, + {Node, Socket}; + Other -> + error({start_node, Name, Other}) + after 60000 -> + error({start_node, Name, timeout}) + end. + +%% @private +-spec controller(atom(), atom(), pid()) -> ok. +controller(Name, Scope, Self) -> + Pa = filename:dirname(code:which(?MODULE)), + Pa2 = filename:dirname(code:which(pg)), + Args = lists:concat(["-setcookie ", erlang:get_cookie(), + "-connect_all false -kernel dist_auto_connect never -noshell -pa ", Pa, " -pa ", Pa2]), + {ok, Node} = test_server:start_node(Name, peer, [{args, Args}]), + case rpc:call(Node, ?MODULE, control, [Scope], 5000) of + {badrpc, nodedown} -> + Self ! {badrpc, Node}, + ok; + {Port, _PgPid} -> + Self ! {'$node_started', Node, Port}, + controller_wait() + end. + +controller_wait() -> + Port = + receive + {socket, Port0} -> + Port0 + end, + MRef = monitor(port, Port), + receive + {'DOWN', MRef, port, Port, _Info} -> + ok + end. + +%% @doc Stops the node previously started with spawn_node, +%% and also closes the RPC socket. +-spec stop_node(node(), gen_tcp:socket()) -> true. +stop_node(Node, Socket) when Node =/= node() -> + true = test_server:stop_node(Node), + Socket =/= undefined andalso gen_tcp:close(Socket), + true. + +forever() -> + fun() -> receive after infinity -> ok end end. + + +-spec control(Scope :: atom()) -> {Port :: integer(), pid()}. +control(Scope) -> + Control = self(), + erlang:spawn(fun () -> server(Control, Scope) end), + receive + {port, Port, PgPid} -> + {Port, PgPid}; + Other -> + error({error, Other}) + end. + +server(Control, Scope) -> + try + {ok, Pid} = if Scope =:= undefined -> {ok, undefined}; true -> pg:start(Scope) end, + {ok, Listen} = gen_tcp:listen(0, [{mode, binary}, {packet, 4}, {ip, ?LOCALHOST}]), + {ok, Port} = inet:port(Listen), + Control ! {port, Port, Pid}, + {ok, Sock} = gen_tcp:accept(Listen), + server_loop(Sock) + catch + Class:Reason:Stack -> + Control ! {error, {Class, Reason, Stack}} + end. + +server_loop(Sock) -> + inet:setopts(Sock, [{active, once}]), + receive + {tcp, Sock, Data} -> + {call, M, F, A} = binary_to_term(Data), + Ret = + try + erlang:apply(M, F, A) of + Res -> + {ok, Res} + catch + exit:Reason -> + {error, {'EXIT', Reason}}; + error:Reason -> + {error, {'EXIT', Reason}} + end, + ok = gen_tcp:send(Sock, term_to_binary(Ret)), + server_loop(Sock); + {tcp_closed, Sock} -> + erlang:halt(1) + end. diff --git a/lib/runtime_tools/src/observer_backend.erl b/lib/runtime_tools/src/observer_backend.erl index f8eb380c02..e38757b939 100644 --- a/lib/runtime_tools/src/observer_backend.erl +++ b/lib/runtime_tools/src/observer_backend.erl @@ -766,6 +766,7 @@ sys_tables() -> mnesia_gvar, mnesia_stats, % mnesia_transient_decision, pg2_table, + pg, queue, schema, shell_records, @@ -777,7 +778,7 @@ sys_tables() -> sys_processes() -> [auth, code_server, global_name_server, inet_db, - mnesia_recover, net_kernel, timer_server, wxe_master]. + mnesia_recover, net_kernel, pg, timer_server, wxe_master]. mnesia_tables() -> [ir_AliasDef, ir_ArrayDef, ir_AttributeDef, ir_ConstantDef, diff --git a/system/doc/design_principles/distributed_applications.xml b/system/doc/design_principles/distributed_applications.xml index a1a0149eb5..451c90a8dc 100644 --- a/system/doc/design_principles/distributed_applications.xml +++ b/system/doc/design_principles/distributed_applications.xml @@ -45,7 +45,7 @@ addressing mechanism is required to ensure that it can be addressed by other applications, regardless on which node it currently executes. This issue is not addressed here, but the - global or pg2 modules in Kernel + global or pg modules in Kernel can be used for this purpose.

diff --git a/system/doc/tutorial/distribution.xml b/system/doc/tutorial/distribution.xml index b489410841..b337dc59b1 100644 --- a/system/doc/tutorial/distribution.xml +++ b/system/doc/tutorial/distribution.xml @@ -59,7 +59,7 @@ global_group - Grouping nodes to global name registration groups. net_adm - Various net administration routines. net_kernel - Networking kernel. - pg2 - Distributed named process groups. + pg - Distributed named process groups. pool - Load distribution facility. slave - Functions for starting and controlling slave nodes. diff --git a/system/doc/tutorial/overview.xml b/system/doc/tutorial/overview.xml index bd652b1e4b..7f38e806d2 100644 --- a/system/doc/tutorial/overview.xml +++ b/system/doc/tutorial/overview.xml @@ -64,7 +64,7 @@ (describes the BIFs)
global manual page in Kernel net_adm manual page in Kernel - pg2 manual page in Kernel + pg manual page in Kernel rpc manual page in Kernel pool manual page in STDLIB slave manual page in STDLIB -- cgit v1.2.1