diff options
author | Rickard Green <rickard@erlang.org> | 2022-05-25 20:45:06 +0200 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2022-05-25 20:45:06 +0200 |
commit | 82c2c9d39e4041df97e175de5e9597736c3c631f (patch) | |
tree | c71db0e57496f749f1b6f6b3402a5f784b557d17 | |
parent | 78c9e20ed86831c6d9618735c295475a7b9fedc4 (diff) | |
parent | 1d0fe6f71c320d2d1d19d9efd6dcba7b18bedca6 (diff) | |
download | erlang-82c2c9d39e4041df97e175de5e9597736c3c631f.tar.gz |
Merge branch 'rickard/global-fixes/22.3.4/OTP-17934' into rickard/global-fixes/23.3.4/OTP-17934
* rickard/global-fixes/22.3.4/OTP-17934:
[kernel] Introduce connect_all kernel parameter
[kernel] global fixes
[kernel] Monitor nodeup/nodedown directly from global
[kernel] Fix global group configuration
[erts,kernel] Connection ID information
kernel: Fix test case monitor_nodess_down_up
Guarantee nodedown before nodeup messages
26 files changed, 2960 insertions, 1148 deletions
diff --git a/bootstrap/lib/stdlib/ebin/erl_internal.beam b/bootstrap/lib/stdlib/ebin/erl_internal.beam Binary files differindex e48f3a1b3e..b980705570 100644 --- a/bootstrap/lib/stdlib/ebin/erl_internal.beam +++ b/bootstrap/lib/stdlib/ebin/erl_internal.beam diff --git a/erts/doc/src/erl_cmd.xml b/erts/doc/src/erl_cmd.xml index 372aee0380..5a59b5a8a2 100644 --- a/erts/doc/src/erl_cmd.xml +++ b/erts/doc/src/erl_cmd.xml @@ -218,10 +218,11 @@ </item> <tag><marker id="connect_all"/><c><![CDATA[-connect_all false]]></c></tag> <item> - <p>If this flag is present, <c><![CDATA[global]]></c> does not maintain - a fully connected network of distributed Erlang nodes, and then - global name registration cannot be used; see - <seeerl marker="kernel:global"><c>global(3)</c></seeerl>.</p> + <p> + This flag is deprecated and has been replaced by the <c>kernel</c> + application parameter + <seeapp marker="kernel:kernel_app#connect_all"><c>connect_all</c></seeapp>. + </p> </item> <tag><c><![CDATA[-cookie Cookie]]></c></tag> <item> diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index dcefdafcaf..965678e85a 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -3886,6 +3886,91 @@ RealSystem = system + MissedSystem</code> </func> <func> + <name name="nodes" arity="2" since="OTP @OTP-17934@"/> + <fsummary>All nodes of a certain type in the system.</fsummary> + <desc> + <p> + Returns a list of <c><anno>NodeInfo</anno></c> tuples. The first + element is the node name. Nodes to be included in the list are + determined by the first argument <c><anno>Arg</anno></c> in the same + way as for + <seemfa marker="#nodes/1"><c>nodes(<anno>Arg</anno>)</c></seemfa>. + The second element of <c><anno>NodeInfo</anno></c> tuples is a map + containing further information about the node identified by the + first element. The information present in this map is determined by + the <c><anno>InfoOpts</anno></c> map passed as the second argument. + Currently the following associations are allowed in the + <c><anno>InfoOpts</anno></c> map:</p> + <taglist> + <tag><c>connection_id => boolean()</c></tag> + <item><p> + If the value of the association equals <c>true</c>, the <c>Info</c> + map in the returned result will contain the key <c>connection_id</c> + associated with the value <c><anno>ConnectionId</anno></c>. If + <c><anno>ConnectionId</anno></c> equals <c>undefined</c>, the node + is not connected to the node which the caller is executing on, or + is the node which the caller is executing on. If + <c><anno>ConnectionId</anno></c> is an integer, the node is + currently connected to the node which the caller is executing on. + </p> + <p> + <marker id="connection_id"/> + The integer connection identifier value together with a node name + identifies a specific connection instance to the node with that + node name. The connection identifier value is node local. That is, + on the other node the connection identifier will <i>not</i> be the + same value. If a connection is taken down and then taken up again, + the connection identifier value will change for the connection to + that node. The amount of values for connection identifiers are + limited, so it is possible to see the same value for different + instances, but quite unlikely. It is undefined how the value + change between two consecutive connection instances. + </p></item> + <tag><c>node_type => boolean()</c></tag> + <item><p> + If the value of the association equals <c>true</c>, the <c>Info</c> + map in the returned result will contain the key <c>node_type</c> + associated with the value <c><anno>NodeTypeInfo</anno></c>. + Currently the following node types exist:</p> + <taglist> + <tag><c>visible</c></tag> + <item><p> + The node is connected to the node of the calling process + through an ordinary visible connection. That is, the node + name would appear in the result returned by + <seemfa marker="#nodes/0"><c>nodes/0</c></seemfa>. + </p></item> + <tag><c>hidden</c></tag> + <item><p> + The node is connected to the node of the calling process + through a hidden connection. That is, the node + name would <i>not</i> appear in the result returned by + <seemfa marker="#nodes/0"><c>nodes/0</c></seemfa>. + </p></item> + <tag><c>this</c></tag> + <item><p> + This is the node of the calling process. + </p></item> + <tag><c>known</c></tag> + <item><p> + The node is not connected but known to the node of the + calling process. + </p></item> + </taglist> + </item> + </taglist> + <p>Example:</p> + <code type="erl"> +(a@localhost)1> nodes([this, connected], #{connection_id=>true, node_type=>true}). +[{c@localhost,#{connection_id => 13892108,node_type => hidden}}, + {b@localhost,#{connection_id => 3067553,node_type => visible}}, + {a@localhost,#{connection_id => undefined,node_type => this}}] +(a@localhost)2> + </code> + </desc> + </func> + + <func> <name name="now" arity="0" since=""/> <fsummary>Elapsed time since 00:00 GMT.</fsummary> <type name="timestamp"/> diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 65f78f5541..7a3f5b8498 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -181,6 +181,7 @@ atom convert_time_unit atom connect atom connected atom connection_closed +atom connection_id atom const atom context_switches atom control diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 5fb8441a15..b3abebfc98 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -110,7 +110,9 @@ bif erlang:monitor_node/2 bif erlang:monitor_node/3 ubif erlang:node/1 ubif erlang:node/0 +bif erlang:nodes/0 bif erlang:nodes/1 +bif erlang:nodes/2 bif erlang:now/0 bif erlang:monotonic_time/0 bif erlang:monotonic_time/1 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index c79c0834f6..99f1e3cca2 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -47,6 +47,7 @@ #include "dtrace-wrapper.h" #include "erl_proc_sig_queue.h" #include "erl_global_literals.h" +#include "erl_map.h" #define DIST_CTL_DEFAULT_SIZE 64 @@ -194,7 +195,7 @@ static Export *dist_ctrl_put_data_trap; static void erts_schedule_dist_command(Port *, DistEntry *); static int dsig_send_exit(ErtsDSigSendContext *ctx, Eterm ctl, Eterm msg); static int dsig_send_ctl(ErtsDSigSendContext *ctx, Eterm ctl); -static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); +static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Uint32, Eterm, Eterm); static void init_nodes_monitors(void); static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id, int *was_connected_p); @@ -301,6 +302,7 @@ typedef enum { typedef struct { ErtsConMonLnkSeqCleanupState state; DistEntry* dep; + Uint32 connection_id; ErtsMonLnkDist *dist; DistSeqNode *seq; void *yield_state; @@ -379,6 +381,7 @@ con_monitor_link_seq_cleanup(void *vcmlcp) send_nodes_mon_msgs(NULL, am_nodedown, cmlcp->nodename, + cmlcp->connection_id, cmlcp->visability, cmlcp->reason); erts_de_rwlock(cmlcp->dep); @@ -444,10 +447,13 @@ schedule_con_monitor_link_seq_cleanup(DistEntry* dep, cmlcp->yield_state = NULL; cmlcp->dist = dist; - if (!dist) + if (!dist) { cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS; + cmlcp->connection_id = 0; + } else { cmlcp->state = ERTS_CML_CLEANUP_STATE_LINKS; + cmlcp->connection_id = dist->connection_id; erts_mtx_lock(&dist->mtx); ASSERT(dist->alive); dist->alive = 0; @@ -794,7 +800,8 @@ set_node_not_alive(void *unused) erts_thr_progress_block(); erts_set_this_node(am_Noname, 0); erts_is_alive = 0; - send_nodes_mon_msgs(NULL, am_nodedown, nodename, am_visible, nodedown.reason); + send_nodes_mon_msgs(NULL, am_nodedown, nodename, ~((Uint32) 0), + am_visible, nodedown.reason); nodedown.reason = NIL; bp = nodedown.bp; nodedown.bp = NULL; @@ -4640,7 +4647,8 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) erts_set_this_node(BIF_ARG_1, (Uint32) creation); erts_this_dist_entry->creation = creation; erts_is_alive = 1; - send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL); + send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, ~((Uint32) 0), + am_visible, NIL); erts_proc_lock(net_kernel, ERTS_PROC_LOCKS_ALL); /* By setting F_DISTRIBUTION on net_kernel, @@ -5031,6 +5039,7 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, send_nodes_mon_msgs(c_p, am_nodeup, dep->sysname, + dep->connection_id, flags & DFLAG_PUBLISHED ? am_visible : am_hidden, NIL); @@ -5744,38 +5753,54 @@ BIF_RETTYPE node_0(BIF_ALIST_0) BIF_RET(erts_this_dist_entry->sysname); } - /**********************************************************************/ /* nodes() -> [ Node ] */ -#if 0 /* Done in erlang.erl instead. */ +static BIF_RETTYPE nodes(Process *c_p, Eterm node_types, Eterm options); + BIF_RETTYPE nodes_0(BIF_ALIST_0) { - return nodes_1(BIF_P, am_visible); + return nodes(BIF_P, am_visible, THE_NON_VALUE); } -#endif - BIF_RETTYPE nodes_1(BIF_ALIST_1) { + return nodes(BIF_P, BIF_ARG_1, THE_NON_VALUE); +} + +BIF_RETTYPE nodes_2(BIF_ALIST_2) +{ + return nodes(BIF_P, BIF_ARG_1, BIF_ARG_2); +} + +typedef struct { + Eterm name; + Eterm type; + Uint32 cid; +} ErtsNodeInfo; + +static BIF_RETTYPE +nodes(Process *c_p, Eterm node_types, Eterm options) +{ + BIF_RETTYPE ret_val; + ErtsNodeInfo *eni, *eni_start = NULL, *eni_end; Eterm result; - int length; - Eterm* hp; + Uint length; int not_connected = 0; int visible = 0; int hidden = 0; int this = 0; - DeclareTmpHeap(buf,2,BIF_P); /* For one cons-cell */ + int node_type = 0; + int connection_id = 0; + int xinfo = 0; + Eterm tmp_heap[2]; /* For one cons-cell */ DistEntry *dep; - Eterm arg_list = BIF_ARG_1; -#ifdef DEBUG - Eterm* endp; -#endif - - UseTmpHeap(2,BIF_P); + Eterm arg_list; - if (is_atom(BIF_ARG_1)) - arg_list = CONS(buf, BIF_ARG_1, NIL); + if (is_atom(node_types)) + arg_list = CONS(&tmp_heap[0], node_types, NIL); + else + arg_list = node_types; while (is_list(arg_list)) { switch(CAR(list_val(arg_list))) { @@ -5784,13 +5809,43 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) case am_known: visible = hidden = not_connected = this = 1; break; case am_this: this = 1; break; case am_connected: visible = hidden = 1; break; - default: goto error; break; + default: goto badarg; break; } arg_list = CDR(list_val(arg_list)); } if (is_not_nil(arg_list)) { - goto error; + goto badarg; + } + + if (is_value(options)) { + if (is_not_map(options)) { + goto badarg; + } + else { + Sint no_opts = 0; + const Eterm *conn_idp = erts_maps_get(am_connection_id, options); + const Eterm *node_typep = erts_maps_get(am_node_type, options); + if (conn_idp) { + switch (*conn_idp) { + case am_true: connection_id = !0; break; + case am_false: connection_id = 0; break; + default: goto badarg; + } + no_opts++; + } + if (node_typep) { + switch (*node_typep) { + case am_true: node_type = !0; break; + case am_false: node_type = 0; break; + default: goto badarg; + } + no_opts++; + } + if (no_opts != erts_map_size(options)) + goto badarg; /* got invalid options... */ + xinfo = !0; + } } length = 0; @@ -5815,50 +5870,130 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) if (length == 0) { erts_rwmtx_runlock(&erts_dist_table_rwmtx); - goto done; + ERTS_BIF_PREP_RET(ret_val, NIL); + return ret_val; } - hp = HAlloc(BIF_P, 2*length); + eni_start = eni = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsNodeInfo)*length); -#ifdef DEBUG - endp = hp + length*2; -#endif - if(not_connected) { - for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { - if (dep != erts_this_dist_entry) { - result = CONS(hp, dep->sysname, result); - hp += 2; - } + if (this) { + eni->name = erts_this_dist_entry->sysname; + eni->type = am_this; + eni->cid = ~((Uint32) 0); + eni++; + } + + if (visible) { + for (dep = erts_visible_dist_entries; dep; dep = dep->next) { + eni->name = dep->sysname; + eni->type = am_visible; + eni->cid = dep->connection_id; + ASSERT(eni->cid >= 0); + eni++; } - for(dep = erts_pending_dist_entries; dep; dep = dep->next) { - result = CONS(hp, dep->sysname, result); - hp += 2; - } } - if(hidden) - for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { - result = CONS(hp, dep->sysname, result); - hp += 2; - } - if(visible) - for(dep = erts_visible_dist_entries; dep; dep = dep->next) { - result = CONS(hp, dep->sysname, result); - hp += 2; - } - if(this) { - result = CONS(hp, erts_this_dist_entry->sysname, result); - hp += 2; + + if (hidden) { + for (dep = erts_hidden_dist_entries; dep; dep = dep->next) { + eni->name = dep->sysname; + eni->type = am_hidden; + eni->cid = dep->connection_id; + eni++; + } + } + + if (not_connected) { + for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { + if (dep != erts_this_dist_entry) { + eni->name = dep->sysname; + eni->type = am_known; + eni->cid = ~((Uint32) 0); + eni++; + } + } + for (dep = erts_pending_dist_entries; dep; dep = dep->next) { + eni->name = dep->sysname; + eni->type = am_known; + eni->cid = ~((Uint32) 0); + eni++; + } } - ASSERT(endp == hp); + erts_rwmtx_runlock(&erts_dist_table_rwmtx); -done: - UnUseTmpHeap(2,BIF_P); - BIF_RET(result); + eni_end = eni; + + result = NIL; + if (!xinfo) { + Eterm *hp = HAlloc(c_p, 2*length); + for (eni = eni_start; eni < eni_end; eni++) { + result = CONS(hp, eni->name, result); + hp += 2; + } + } + else { + Eterm ks[2], *hp, keys_tuple = THE_NON_VALUE; + Uint map_size = 0, el_xtra, xtra; + ErtsHeapFactory hfact; + + erts_factory_proc_init(&hfact, c_p); + + if (connection_id) { + ks[map_size++] = am_connection_id; + } + if (node_type) { + ks[map_size++] = am_node_type; + } + + el_xtra = 3 + 2 + MAP_HEADER_FLATMAP_SZ + map_size; + xtra = length*el_xtra; + + for (eni = eni_start; eni < eni_end; eni++) { + Eterm vs[2], info_map, tuple; + map_size = 0; + if (connection_id) { + Eterm cid; + if (eni->cid == ~((Uint32) 0)) + cid = am_undefined; + else if (IS_USMALL(0, (Uint) eni->cid)) + cid = make_small((Uint) eni->cid); + else { + hp = erts_produce_heap(&hfact, BIG_UINT_HEAP_SIZE, xtra); + cid = uint_to_big((Uint) eni->cid, hp); + } + vs[map_size++] = cid; + } + if (node_type) { + vs[map_size++] = eni->type; + } + + info_map = erts_map_from_sorted_ks_and_vs(&hfact, ks, vs, + map_size, &keys_tuple); + + hp = erts_produce_heap(&hfact, 3+2, xtra); + + tuple = TUPLE2(hp, eni->name, info_map); + hp += 3; + result = CONS(hp, tuple, result); + xtra -= el_xtra; + } + + erts_factory_close(&hfact); + } -error: - UnUseTmpHeap(2,BIF_P); - BIF_ERROR(BIF_P,BADARG); + erts_free(ERTS_ALC_T_TMP, (void *) eni_start); + + if (length > 10) { + Uint reds = length / 10; + BUMP_REDS(c_p, reds); + } + + ERTS_BIF_PREP_RET(ret_val, result); + return ret_val; + +badarg: + ERTS_BIF_PREP_ERROR(ret_val, c_p, BADARG); + return ret_val; } /**********************************************************************/ @@ -6089,6 +6224,8 @@ BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1) #define ERTS_NODES_MON_OPT_TYPE_VISIBLE (((Uint16) 1) << 0) #define ERTS_NODES_MON_OPT_TYPE_HIDDEN (((Uint16) 1) << 1) #define ERTS_NODES_MON_OPT_DOWN_REASON (((Uint16) 1) << 2) +#define ERTS_NODES_MON_OPT_INFO_MAP (((Uint16) 1) << 3) +#define ERTS_NODES_MON_OPT_CONN_ID (((Uint16) 1) << 4) #define ERTS_NODES_MON_OPT_TYPES \ (ERTS_NODES_MON_OPT_TYPE_VISIBLE|ERTS_NODES_MON_OPT_TYPE_HIDDEN) @@ -6118,10 +6255,10 @@ init_nodes_monitors(void) } Eterm -erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) +erts_monitor_nodes(Process *c_p, Eterm on, Eterm options) { - Eterm key, old_value, opts_list = olist; - Uint opts = (Uint) 0; + Eterm key, old_value; + Uint opts = (Uint) ERTS_NODES_MON_OPT_INFO_MAP; ASSERT(c_p); ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); @@ -6129,55 +6266,63 @@ erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) if (on != am_true && on != am_false) return THE_NON_VALUE; - if (is_not_nil(opts_list)) { - int all = 0, visible = 0, hidden = 0; - - while (is_list(opts_list)) { - Eterm *cp = list_val(opts_list); - Eterm opt = CAR(cp); - opts_list = CDR(cp); - if (opt == am_nodedown_reason) + if (is_nil(options)) { + opts &= ~ERTS_NODES_MON_OPT_INFO_MAP; + } + else if (is_not_map(options)) { + return THE_NON_VALUE; + } + else { + Sint no_opts = 0; + const Eterm *l = erts_maps_get(am_list, options); + const Eterm *cid = erts_maps_get(am_connection_id, options); + const Eterm *nt = erts_maps_get(am_node_type, options); + const Eterm *nr = erts_maps_get(am_nodedown_reason, options); + if (l) { + if (*l == am_true) { + opts &= ~ERTS_NODES_MON_OPT_INFO_MAP; + } + else { + return THE_NON_VALUE; + } + no_opts++; + } + if (cid) { + if (*cid == am_true) { + opts |= ERTS_NODES_MON_OPT_CONN_ID; + } + else if (*cid != am_false) { + return THE_NON_VALUE; + } + no_opts++; + } + if (nt) { + switch (*nt) { + case am_visible: + opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE; + break; + case am_hidden: + opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN; + break; + case am_all: + opts |= ERTS_NODES_MON_OPT_TYPES; + break; + default: + return THE_NON_VALUE; + } + no_opts++; + } + if (nr) { + if (*nr == am_true) { opts |= ERTS_NODES_MON_OPT_DOWN_REASON; - else if (is_tuple(opt)) { - Eterm* tp = tuple_val(opt); - if (arityval(tp[0]) != 2) - return THE_NON_VALUE; - switch (tp[1]) { - case am_node_type: - switch (tp[2]) { - case am_visible: - if (hidden || all) - return THE_NON_VALUE; - opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE; - visible = 1; - break; - case am_hidden: - if (visible || all) - return THE_NON_VALUE; - opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN; - hidden = 1; - break; - case am_all: - if (visible || hidden) - return THE_NON_VALUE; - opts |= ERTS_NODES_MON_OPT_TYPES; - all = 1; - break; - default: - return THE_NON_VALUE; - } - break; - default: - return THE_NON_VALUE; - } - } - else { - return THE_NON_VALUE; - } - } - - if (is_not_nil(opts_list)) - return THE_NON_VALUE; + } + else if (*nr != am_false) { + return THE_NON_VALUE; + } + no_opts++; + } + if (no_opts != erts_map_size(options)) + return THE_NON_VALUE; /* got invalid options... */ } key = make_small(opts); @@ -6270,8 +6415,24 @@ save_nodes_monitor(ErtsMonitor *mon, void *vctxt, Sint reds) return 1; } +#define ERTS_MON_NODES_MAX_INFO_LIST_SZ__(MAX_ELEMS) \ + ((MAX_ELEMS)*(3 /* key/value 2-tuple */ + 2/* cons cell */) \ + + BIG_UINT_HEAP_SIZE /* connection id value */ \ + + 4 /* top 3-tuple */) +#define ERTS_MON_NODES_MAX_INFO_MAP_SZ__(MAX_ELEMS) \ + ((MAX_ELEMS)*2 /* keys and values */ \ + + 1 /* key tuple header */ + MAP_HEADER_FLATMAP_SZ /* 3 */ \ + + BIG_UINT_HEAP_SIZE /* connection id value */ \ + + 4 /* top 3-tuple */) +#define ERTS_MON_NODES_MAX_INFO_SZ__(MAX_ELEMS) \ + ((ERTS_MON_NODES_MAX_INFO_MAP_SZ__((MAX_ELEMS)) \ + > ERTS_MON_NODES_MAX_INFO_LIST_SZ__((MAX_ELEMS))) \ + ? ERTS_MON_NODES_MAX_INFO_MAP_SZ__((MAX_ELEMS)) \ + : ERTS_MON_NODES_MAX_INFO_LIST_SZ__((MAX_ELEMS))) + static void -send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reason) +send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, + Uint32 connection_id, Eterm type, Eterm reason) { Uint opts; Uint i, no, reason_size; @@ -6317,7 +6478,8 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas erts_mtx_unlock(&nodes_monitors_mtx); for (i = 0; i < no; i++) { - Eterm tmp_heap[3+2+3+2+4 /* max need */]; + ErtsHeapFactory hfact; + Eterm tmp_heap[ERTS_MON_NODES_MAX_INFO_SZ__(3/* max info elements */)]; Eterm *hp, msg; Uint hsz; @@ -6343,42 +6505,114 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas } } + /* + * tmp_heap[] is sized so there will be room for everything + * we need assuming no info, a two-tuple info list, or an info + * flat map is generated. In case there would be a greater heap + * need this will be taken care of by the heap factory... + */ + erts_factory_tmp_init(&hfact, + &tmp_heap[0], + sizeof(tmp_heap)/sizeof(Uint), + ERTS_ALC_T_TMP); hsz = 0; - hp = &tmp_heap[0]; if (!opts) { + hp = erts_produce_heap(&hfact, 3, 0); msg = TUPLE2(hp, what, node); - hp += 3; } - else { + else { /* Info list or map... */ Eterm tup; - Eterm info = NIL; + Eterm info; - if (opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE - | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) { + if (opts & ERTS_NODES_MON_OPT_INFO_MAP) { /* Info map */ + Uint map_size = 0; + Eterm ks[3], vs[3]; - tup = TUPLE2(hp, am_node_type, type); - hp += 3; - info = CONS(hp, tup, info); - hp += 2; - } + if (opts & ERTS_NODES_MON_OPT_CONN_ID) { + Eterm cid; + if (connection_id == ~((Uint32) 0)) { + cid = am_undefined; + } + else if (IS_USMALL(0, (Uint) connection_id)) { + cid = make_small(connection_id); + } + else { + hp = erts_produce_heap(&hfact, BIG_UINT_HEAP_SIZE, 0); + cid = uint_to_big(connection_id, hp); + } + ks[map_size] = am_connection_id; + vs[map_size] = cid; + map_size++; + } + if (opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE + | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) { + ks[map_size] = am_node_type; + vs[map_size] = type; + map_size++; + } + if (what == am_nodedown + && (opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { + hsz += reason_size; + ks[map_size] = am_nodedown_reason; + vs[map_size] = reason; + map_size++; + } - if (what == am_nodedown - && (opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { - hsz += reason_size; - tup = TUPLE2(hp, am_nodedown_reason, reason); - hp += 3; - info = CONS(hp, tup, info); - hp += 2; + info = erts_map_from_sorted_ks_and_vs(&hfact, ks, vs, + map_size, NULL); + ASSERT(is_value(info)); } + else { /* Info list */ + + info = NIL; + if (opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE + | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) { + hp = erts_produce_heap(&hfact, 3 + 2, 0); + tup = TUPLE2(hp, am_node_type, type); + hp += 3; + info = CONS(hp, tup, info); + } + + if (what == am_nodedown + && (opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { + hp = erts_produce_heap(&hfact, 3 + 2, 0); + hsz += reason_size; + tup = TUPLE2(hp, am_nodedown_reason, reason); + hp += 3; + info = CONS(hp, tup, info); + } + if (opts & ERTS_NODES_MON_OPT_CONN_ID) { + Eterm cid; + if (connection_id == ~((Uint32) 0)) { + cid = am_undefined; + } + else if (IS_USMALL(0, (Uint) connection_id)) { + cid = make_small(connection_id); + } + else { + hp = erts_produce_heap(&hfact, BIG_UINT_HEAP_SIZE, 0); + cid = uint_to_big(connection_id, hp); + } + hp = erts_produce_heap(&hfact, 3 + 2, 0); + tup = TUPLE2(hp, am_connection_id, cid); + hp += 3; + info = CONS(hp, tup, info); + } + } + + hp = erts_produce_heap(&hfact, 4, 0); msg = TUPLE3(hp, what, node, info); - hp += 4; } - ASSERT(hp - &tmp_heap[0] <= sizeof(tmp_heap)/sizeof(tmp_heap[0])); - hsz += hp - &tmp_heap[0]; + hsz += hfact.hp - hfact.hp_start; + if (hfact.heap_frags) { + ErlHeapFragment *bp; + for (bp = hfact.heap_frags; bp; bp = bp->next) + hsz += bp->used_size; + } erts_proc_sig_send_persistent_monitor_msg(ERTS_MON_TYPE_NODES, nmdp[i].options, @@ -6386,6 +6620,8 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas nmdp[i].pid, msg, hsz); + + erts_factory_close(&hfact); } if (nmdp != &def_buf[0]) diff --git a/erts/emulator/beam/erl_map.c b/erts/emulator/beam/erl_map.c index e7c649c5a7..41cff741f8 100644 --- a/erts/emulator/beam/erl_map.c +++ b/erts/emulator/beam/erl_map.c @@ -121,28 +121,35 @@ void erts_init_map(void) { */ BIF_RETTYPE map_size_1(BIF_ALIST_1) { - if (is_flatmap(BIF_ARG_1)) { - flatmap_t *mp = (flatmap_t*)flatmap_val(BIF_ARG_1); - BIF_RET(make_small(flatmap_get_size(mp))); - } else if (is_hashmap(BIF_ARG_1)) { - Eterm *head; - Uint size; + Sint size = erts_map_size(BIF_ARG_1); + if (size < 0) { + BIF_P->fvalue = BIF_ARG_1; + BIF_ERROR(BIF_P, BADMAP); + } - head = hashmap_val(BIF_ARG_1); - size = head[1]; + /* + * As long as a small has 28 bits (on a 32-bit machine) for + * the integer itself, it is impossible to build a map whose + * size would not fit in a small. Add an assertion in case we + * ever decreases the number of bits in a small. + */ + ASSERT(IS_USMALL(0, size)); + BIF_RET(make_small(size)); +} - /* - * As long as a small has 28 bits (on a 32-bit machine) for - * the integer itself, it is impossible to build a map whose - * size would not fit in a small. Add an assertion in case we - * ever decreases the number of bits in a small. - */ - ASSERT(IS_USMALL(0, size)); - BIF_RET(make_small(size)); +Sint +erts_map_size(Eterm map) +{ + if (is_flatmap(map)) { + flatmap_t *mp = (flatmap_t*)flatmap_val(map); + return (Sint) flatmap_get_size(mp); + } + else if (is_hashmap(map)) { + Eterm *head = hashmap_val(map); + return (Sint) head[1]; } - BIF_P->fvalue = BIF_ARG_1; - BIF_ERROR(BIF_P, BADMAP); + return -1; } /* maps:find/2 @@ -478,38 +485,87 @@ Eterm erts_hashmap_from_array(ErtsHeapFactory* factory, Eterm *leafs, Uint n, return res; } -Eterm erts_map_from_ks_and_vs(ErtsHeapFactory *factory, Eterm *ks0, Eterm *vs0, Uint n) +static ERTS_INLINE Eterm +from_ks_and_vs(ErtsHeapFactory *factory, Eterm *ks, Eterm *vs, + Uint n, Eterm *key_tuple, flatmap_t **fmpp) { if (n <= MAP_SMALL_MAP_LIMIT) { - Eterm *ks, *vs, *hp; - flatmap_t *mp; + Eterm *hp; + flatmap_t *fmp; Eterm keys; - hp = erts_produce_heap(factory, 3 + 1 + (2 * n), 0); - keys = make_tuple(hp); - *hp++ = make_arityval(n); - ks = hp; - hp += n; - mp = (flatmap_t*)hp; - hp += MAP_HEADER_FLATMAP_SZ; - vs = hp; + if (key_tuple && is_value(*key_tuple)) { + keys = *key_tuple; + hp = erts_produce_heap(factory, MAP_HEADER_FLATMAP_SZ + n, 0); + ASSERT(is_tuple_arity(keys, n)); + ASSERT(sys_memcmp((void *) (tuple_val(keys) + 1), + (void *) ks, + n * sizeof(Eterm)) == 0); + } + else { + hp = erts_produce_heap(factory, 1 + MAP_HEADER_FLATMAP_SZ + 2*n, 0); + keys = make_tuple(hp); + if (key_tuple) { + *key_tuple = keys; + } + *hp++ = make_arityval(n); + sys_memcpy((void *) hp, + (void *) ks, + n * sizeof(Eterm)); + hp += n; + } - mp->thing_word = MAP_HEADER_FLATMAP; - mp->size = n; - mp->keys = keys; + fmp = (flatmap_t*)hp; + hp += MAP_HEADER_FLATMAP_SZ; + + fmp->thing_word = MAP_HEADER_FLATMAP; + fmp->size = n; + fmp->keys = keys; - sys_memcpy(ks, ks0, n * sizeof(Eterm)); - sys_memcpy(vs, vs0, n * sizeof(Eterm)); + sys_memcpy((void *) hp, (void *) vs, n * sizeof(Eterm)); - if (!erts_validate_and_sort_flatmap(mp)) { + if (fmpp) { + *fmpp = fmp; return THE_NON_VALUE; } - - return make_flatmap(mp); + return make_flatmap(fmp); } else { - return erts_hashmap_from_ks_and_vs(factory, ks0, vs0, n); + if (fmpp) { + *fmpp = NULL; + } + return erts_hashmap_from_ks_and_vs(factory, ks, vs, n); } - return THE_NON_VALUE; +} + +Eterm erts_map_from_ks_and_vs(ErtsHeapFactory *factory, Eterm *ks, Eterm *vs, Uint n) +{ + Eterm res; + flatmap_t *fmp; + + res = from_ks_and_vs(factory, ks, vs, n, NULL, &fmp); + if (fmp) { + if (erts_validate_and_sort_flatmap(fmp)) { + res = make_flatmap(fmp); + } + else { + res = THE_NON_VALUE; + } + } + return res; +} + +Eterm erts_map_from_sorted_ks_and_vs(ErtsHeapFactory *factory, Eterm *ks, Eterm *vs, + Uint n, Eterm *key_tuple) +{ +#ifdef DEBUG + Uint i; + /* verify that key array contains unique and sorted keys... */ + for (i = 1; i < n; i++) { + ASSERT(CMP_TERM(ks[i-1], ks[i]) < 0); + } +#endif + + return from_ks_and_vs(factory, ks, vs, n, key_tuple, NULL); } diff --git a/erts/emulator/beam/erl_map.h b/erts/emulator/beam/erl_map.h index 718d400e22..6236ac8e2f 100644 --- a/erts/emulator/beam/erl_map.h +++ b/erts/emulator/beam/erl_map.h @@ -101,6 +101,8 @@ Eterm erts_hashmap_from_array(ErtsHeapFactory*, Eterm *leafs, Uint n, int rejec erts_hashmap_from_ks_and_vs_extra((F), (KS), (VS), (N), THE_NON_VALUE, THE_NON_VALUE); Eterm erts_map_from_ks_and_vs(ErtsHeapFactory *factory, Eterm *ks, Eterm *vs, Uint n); +Eterm erts_map_from_sorted_ks_and_vs(ErtsHeapFactory *factory, Eterm *ks0, Eterm *vs0, + Uint n, Eterm *key_tuple); Eterm erts_hashmap_from_ks_and_vs_extra(ErtsHeapFactory *factory, Eterm *ks, Eterm *vs, Uint n, Eterm k, Eterm v); @@ -109,6 +111,8 @@ const Eterm *erts_maps_get(Eterm key, Eterm map); const Eterm *erts_hashmap_get(Uint32 hx, Eterm key, Eterm map); +Sint erts_map_size(Eterm map); + /* hamt nodes v2.0 * * node :: leaf | array | bitmap diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 75c6b6abce..f71a6f67df 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -152,9 +152,13 @@ dist_table_alloc(void *dep_tmpl) Eterm sysname; Binary *bin; DistEntry *dep; + Uint32 init_connection_id; erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER; rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ; + init_connection_id = (Uint32) erts_get_monotonic_time(NULL); + init_connection_id &= ERTS_DIST_CON_ID_MASK; + sysname = ((DistEntry *) dep_tmpl)->sysname; bin = erts_create_magic_binary_x(sizeof(DistEntry), @@ -175,7 +179,7 @@ dist_table_alloc(void *dep_tmpl) dep->creation = 0; /* undefined */ dep->cid = NIL; erts_atomic_init_nob(&dep->input_handler, (erts_aint_t) NIL); - dep->connection_id = 0; + dep->connection_id = init_connection_id; dep->state = ERTS_DE_STATE_IDLE; dep->pending_nodedown = 0; dep->suspended_nodeup = NULL; diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index 2e02fd95e8..a77d71fa9f 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -41,7 +41,7 @@ init_per_suite/1, end_per_suite/1, init_per_group/2, end_per_group/2, ping/1, bulk_send_small/1, - group_leader/1, + group_leader/1, nodes2/1, optimistic_dflags/1, bulk_send_big/1, bulk_send_bigbig/1, local_send_small/1, local_send_big/1, @@ -94,7 +94,7 @@ suite() -> all() -> [ping, {group, bulk_send}, {group, local_send}, - group_leader, + group_leader, nodes2, optimistic_dflags, link_to_busy, exit_to_busy, lost_exit, link_to_dead, link_to_dead_new_node, @@ -214,6 +214,294 @@ group_leader_1(Node2) -> ?Line {ExtPid, group_leader, GL2} = receive_one(), ok. +nodes2(Config) when is_list(Config) -> + + This = node(), + + ok = net_kernel:monitor_nodes(true, #{node_type => all, + connection_id => true}), + + AlreadyConnected = maps:from_list(lists:map(fun (N) -> + {N, true} + end, nodes(connected))), + AlreadyVisible = maps:from_list(lists:map(fun (N) -> + {N, true} + end, nodes(visible))), + AlreadyHidden = maps:from_list(lists:map(fun (N) -> + {N, true} + end, nodes(visible))), + AlreadyKnown = maps:from_list(lists:map(fun (N) -> + {N, true} + end, nodes(known))), + + {ok, V1} = start_node(visible1), + {ok, H1} = start_node(hidden1, "-hidden"), + {ok, V2} = start_node(visible2), + {ok, H2} = start_node(hidden2, "-hidden"), + + TestNodes = maps:from_list(lists:map(fun (N) -> + {N, true} + end, [This, V1, H1, V2, H2])), + + V1CId = receive {nodeup, V1, #{connection_id := C1, node_type := visible}} -> C1 end, + V2CId = receive {nodeup, V2, #{connection_id := C2, node_type := visible}} -> C2 end, + H1CId = receive {nodeup, H1, #{connection_id := C3, node_type := hidden}} -> C3 end, + H2CId = receive {nodeup, H2, #{connection_id := C4, node_type := hidden}} -> C4 end, + + lists:foreach(fun ({N, I}) when N == V1 -> + 2 = maps:size(I), + #{connection_id := V1CId, node_type := visible} = I; + ({N, I}) when N == V2 -> + 2 = maps:size(I), + #{connection_id := V2CId, node_type := visible} = I; + ({N, I}) when N == H1 -> + 2 = maps:size(I), + #{connection_id := H1CId, node_type := hidden} = I; + ({N, I}) when N == H2 -> + 2 = maps:size(I), + #{connection_id := H2CId, node_type := hidden} = I; + ({N, I}) -> + 2 = maps:size(I), + #{connection_id := _, node_type := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyConnected) + end, erlang:nodes(connected, #{connection_id => true, + node_type => true})), + lists:foreach(fun ({N, I}) when N == V1 -> + 2 = maps:size(I), + #{connection_id := V1CId, node_type := visible} = I; + ({N, I}) when N == V2 -> + 2 = maps:size(I), + #{connection_id := V2CId, node_type := visible} = I; + ({N, I}) when N == H1 -> + 2 = maps:size(I), + #{connection_id := H1CId, node_type := hidden} = I; + ({N, I}) when N == H2 -> + 2 = maps:size(I), + #{connection_id := H2CId, node_type := hidden} = I; + ({N, I}) when N == This -> + 2 = maps:size(I), + #{connection_id := undefined, node_type := this} = I; + ({N, I}) -> + 2 = maps:size(I), + #{connection_id := _, node_type := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyConnected) + end, erlang:nodes([this, connected], #{connection_id => true, + node_type => true})), + lists:foreach(fun ({N, I}) when N == V1 -> + 1 = maps:size(I), + #{connection_id := V1CId} = I; + ({N, I}) when N == V2 -> + 1 = maps:size(I), + #{connection_id := V2CId} = I; + ({N, I}) when N == H1 -> + 1 = maps:size(I), + #{connection_id := H1CId} = I; + ({N, I}) when N == H2 -> + 1 = maps:size(I), + #{connection_id := H2CId} = I; + ({N, I}) -> + 1 = maps:size(I), + #{connection_id := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyConnected) + end, erlang:nodes(connected, #{connection_id => true})), + lists:foreach(fun ({N, I}) when N == V1 -> + 1 = maps:size(I), + #{node_type := visible} = I; + ({N, I}) when N == V2 -> + 1 = maps:size(I), + #{node_type := visible} = I; + ({N, I}) when N == H1 -> + 1 = maps:size(I), + #{node_type := hidden} = I; + ({N, I}) when N == H2 -> + 1 = maps:size(I), + #{node_type := hidden} = I; + ({N, I}) -> + 1 = maps:size(I), + #{node_type := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyConnected) + end, erlang:nodes(connected, #{node_type => true})), + lists:foreach(fun ({N, I}) when N == V1 -> + 2 = maps:size(I), + #{connection_id := V1CId, node_type := visible} = I; + ({N, I}) when N == V2 -> + 2 = maps:size(I), + #{connection_id := V2CId, node_type := visible} = I; + ({N, I}) -> + 2 = maps:size(I), + #{connection_id := _, node_type := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyVisible) + end, erlang:nodes(visible, #{connection_id => true, + node_type => true})), + lists:foreach(fun ({N, I}) when N == V1 -> + 2 = maps:size(I), + #{connection_id := V1CId, node_type := visible} = I; + ({N, I}) when N == V2 -> + 2 = maps:size(I), + #{connection_id := V2CId, node_type := visible} = I; + ({N, I}) when N == This -> + 2 = maps:size(I), + #{connection_id := undefined, node_type := this} = I; + ({N, I}) -> + 2 = maps:size(I), + #{connection_id := _, node_type := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyVisible) + end, erlang:nodes([this, visible], #{connection_id => true, + node_type => true})), + lists:foreach(fun ({N, I}) when N == H1 -> + 2 = maps:size(I), + #{connection_id := H1CId, node_type := hidden} = I; + ({N, I}) when N == H2 -> + 2 = maps:size(I), + #{connection_id := H2CId, node_type := hidden} = I; + ({N, I}) -> + 2 = maps:size(I), + #{connection_id := _, node_type := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyHidden) + end, erlang:nodes(hidden, #{connection_id => true, + node_type => true})), + [{This, #{connection_id := undefined, + node_type := this}}] = erlang:nodes(this, #{connection_id => true, + node_type => true}), + [{This, #{connection_id := undefined}}] = erlang:nodes(this, #{connection_id => true}), + [{This, #{node_type := this}}] = erlang:nodes(this, #{node_type => true}), + + %% Ensure dist these dist entries are not GC:d yet... + NKV2 = rpc:call(V2, erlang, whereis, [net_kernel]), + true = is_pid(NKV2), + NKH2 = rpc:call(H2, erlang, whereis, [net_kernel]), + true = is_pid(NKH2), + + stop_node(V2), + stop_node(H2), + + receive {nodedown, V2, #{connection_id := V2CId, node_type := visible}} -> ok end, + receive {nodedown, H2, #{connection_id := H2CId, node_type := hidden}} -> ok end, + + lists:foreach(fun ({N, I}) when N == V1 -> + 2 = maps:size(I), + #{connection_id := V1CId, node_type := visible} = I; + ({N, I}) when N == V2 -> + 2 = maps:size(I), + #{connection_id := undefined, node_type := known} = I; + ({N, I}) when N == H1 -> + 2 = maps:size(I), + #{connection_id := H1CId, node_type := hidden} = I; + ({N, I}) when N == H2 -> + 2 = maps:size(I), + #{connection_id := undefined, node_type := known} = I; + ({N, I}) when N == This -> + 2 = maps:size(I), + #{connection_id := undefined, node_type := this} = I; + ({N, I}) -> + 2 = maps:size(I), + #{connection_id := _, node_type := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyKnown) + end, erlang:nodes(known, #{connection_id => true, + node_type => true})), + lists:foreach(fun ({N, I}) when N == V1 -> + 1 = maps:size(I), + #{node_type := visible} = I; + ({N, I}) when N == V2 -> + 1 = maps:size(I), + #{node_type := known} = I; + ({N, I}) when N == H1 -> + 1 = maps:size(I), + #{node_type := hidden} = I; + ({N, I}) when N == H2 -> + 1 = maps:size(I), + #{node_type := known} = I; + ({N, I}) when N == This -> + 1 = maps:size(I), + #{node_type := this} = I; + ({N, I}) -> + 1 = maps:size(I), + #{node_type := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyKnown) + end, erlang:nodes(known, #{node_type => true})), + lists:foreach(fun ({N, I}) when N == V1 -> + 1 = maps:size(I), + #{connection_id := V1CId} = I; + ({N, I}) when N == V2 -> + 1 = maps:size(I), + #{connection_id := undefined} = I; + ({N, I}) when N == H1 -> + 1 = maps:size(I), + #{connection_id := H1CId} = I; + ({N, I}) when N == H2 -> + 1 = maps:size(I), + #{connection_id := undefined} = I; + ({N, I}) when N == This -> + 1 = maps:size(I), + #{connection_id := undefined} = I; + ({N, I}) -> + 1 = maps:size(I), + #{connection_id := _} = I, + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyKnown) + end, erlang:nodes(known, #{connection_id => true})), + lists:foreach(fun ({N, I}) when N == V1 -> + 0 = maps:size(I), + #{} = I; + ({N, I}) when N == V2 -> + 0 = maps:size(I), + #{} = I; + ({N, I}) when N == H1 -> + 0 = maps:size(I), + #{} = I; + ({N, I}) when N == H2 -> + 0 = maps:size(I), + #{} = I; + ({N, I}) when N == This -> + 0 = maps:size(I), + #{} = I; + ({N, I}) -> + 0 = maps:size(I), + false = maps:is_key(N, TestNodes), + true = maps:is_key(N, AlreadyKnown) + end, erlang:nodes(known, #{})), + + stop_node(V1), + stop_node(H1), + + id(NKV2), + id(NKH2), + + try erlang:nodes("visible", #{connection_id => true}) + catch error:badarg -> ok + end, + try erlang:nodes([another], #{connection_id => true}) + catch error:badarg -> ok + end, + try erlang:nodes(visible, #{cid => true}) + catch error:badarg -> ok + end, + try erlang:nodes(visible, #{connection_id => yes}) + catch error:badarg -> ok + end, + try erlang:nodes(visible, #{node_type => yes}) + catch error:badarg -> ok + end, + try erlang:nodes(visible, [{connection_id, true}]) + catch error:badarg -> ok + end, + try erlang:nodes(visible, [{node_type, true}]) + catch error:badarg -> ok + end, + ok. + +id(X) -> + X. + %% Test optimistic distribution flags toward pending connections (DFLAG_DIST_HOPEFULLY) optimistic_dflags(Config) when is_list(Config) -> ?Line Sender = start_relay_node(optimistic_dflags_sender, []), diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex 927d9841e9..57e6f29b4e 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index d9ba8328dd..b66d2341f7 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -36,7 +36,7 @@ -export([dmonitor_node/3]). -export([delay_trap/2]). -export([set_cookie/2, get_cookie/0]). --export([nodes/0]). +-export([nodes/0, nodes/1, nodes/2]). -export([integer_to_list/2]). -export([integer_to_binary/2]). @@ -191,7 +191,7 @@ is_list/1, is_map/1, is_number/1, is_pid/1, is_port/1, is_record/2, is_record/3, is_reference/1, is_tuple/1, load_module/2, load_nif/2, localtime_to_universaltime/2, make_fun/3, - make_tuple/2, make_tuple/3, nodes/1, open_port/2, + make_tuple/2, make_tuple/3, open_port/2, port_call/2, port_call/3, port_info/1, port_info/2, process_flag/2, process_info/2, send/2, send/3, seq_trace_info/1, setelement/3, @@ -2217,13 +2217,6 @@ make_tuple(_Arity,_InitialValue) -> make_tuple(_Arity,_DefaultValue,_InitList) -> erlang:nif_error(undefined). --spec nodes(Arg) -> Nodes when - Arg :: NodeType | [NodeType], - NodeType :: visible | hidden | connected | this | known, - Nodes :: [node()]. -nodes(_Arg) -> - erlang:nif_error(undefined). - -spec open_port(PortName, PortSettings) -> port() when PortName :: {spawn, Command :: string() | binary()} | {spawn_driver, Command :: string() | binary()} | @@ -3318,7 +3311,28 @@ yield() -> -spec nodes() -> Nodes when Nodes :: [node()]. nodes() -> - erlang:nodes(visible). + erlang:nif_error(undefined). + +-spec nodes(Arg) -> Nodes when + Arg :: NodeType | [NodeType], + NodeType :: visible | hidden | connected | this | known, + Nodes :: [node()]. +nodes(_Arg) -> + erlang:nif_error(undefined). + +-spec nodes(Arg, InfoOpts) -> [NodeInfo] when + NodeType :: visible | hidden | connected | this | known, + Arg :: NodeType | [NodeType], + InfoOpts :: #{connection_id => boolean(), + node_type => boolean()}, + NodeTypeInfo :: visible | hidden | this | known, + ConnectionId :: undefined | integer(), + Info :: #{connection_id => ConnectionId, + node_type => NodeTypeInfo}, + NodeInfo :: {node(), Info}. + +nodes(_Args, _Opts) -> + erlang:nif_error(undefined). -spec disconnect_node(Node) -> boolean() | ignored when Node :: node(). diff --git a/erts/preloaded/src/erts.app.src b/erts/preloaded/src/erts.app.src index bcdaf400a1..ee6eabc0e2 100644 --- a/erts/preloaded/src/erts.app.src +++ b/erts/preloaded/src/erts.app.src @@ -42,7 +42,7 @@ {registered, []}, {applications, []}, {env, []}, - {runtime_dependencies, ["stdlib-3.13", "kernel-@OTP-17843@", "sasl-3.3"]} + {runtime_dependencies, ["stdlib-@OTP-17934@", "kernel-@OTP-17843:OTP-17934@", "sasl-3.3"]} ]}. %% vim: ft=erlang diff --git a/lib/kernel/doc/src/global.xml b/lib/kernel/doc/src/global.xml index d30f721448..d424034db9 100644 --- a/lib/kernel/doc/src/global.xml +++ b/lib/kernel/doc/src/global.xml @@ -70,6 +70,17 @@ enabled by default. </p> </warning> + <note> + <p> + None of the above services will be reliably delivered unless both of + the kernel parameters + <seeapp marker="kernel_app#connect_all"><c>connect_all</c></seeapp> + and <seeapp marker="kernel_app#prevent_overlapping_partitions"> + <c>prevent_overlapping_partitions</c></seeapp> are enabled. Calls to + the <c>global</c> API will, however, <i>not</i> fail even though one or + both of them are disabled. You will just get unreliable results. + </p> + </note> <p>These services are controlled through the process <c>global_name_server</c> that exists on every node. The global name server starts automatically when a node is started. @@ -108,9 +119,7 @@ example, if node <c>N1</c> connects to node <c>N2</c> (which is already connected to <c>N3</c>), the global name servers on the nodes <c>N1</c> and <c>N3</c> ensure that also <c>N1</c> - and <c>N3</c> are connected. If this is not desired, - command-line flag <c>-connect_all false</c> can be used (see also - <seecom marker="erts:erl#connect_all"><c>erl(1)</c></seecom>). + and <c>N3</c> are connected. In this case, the name registration service cannot be used, but the lock mechanism still works.</p> <p>If the global name server fails to connect nodes (<c>N1</c> and diff --git a/lib/kernel/doc/src/kernel_app.xml b/lib/kernel/doc/src/kernel_app.xml index 2bcb9f5d25..05f4daef4e 100644 --- a/lib/kernel/doc/src/kernel_app.xml +++ b/lib/kernel/doc/src/kernel_app.xml @@ -121,6 +121,32 @@ application. For more information about configuration parameters, see file <seefile marker="app"><c>app(4)</c></seefile>.</p> <taglist> + <tag><marker id="connect_all"/> + <c>connect_all = true | false</c></tag> + <item> + <p> + If enabled (<c>true</c>), which also is the default, + <seeerl marker="global"><c>global(3)</c></seeerl> will + actively connect to all nodes that becomes known to it. Note + that you also want to enable + <seeapp marker="#prevent_overlapping_partitions"> + <c>prevent_overlapping_partitions</c></seeapp> in order for + <c>global</c> to ensure that a fully connected network + is maintained. <c>prevent_overlapping_partitions</c> + will also prevent inconsistencies in <c>global</c>'s + name registration and locking. Note that + <c>prevent_overlapping_partitions</c> currently is disabled + by default. + </p> + <p> + The now deprecated command line argument + <seecom marker="erts:erl#connect_all"><c>-connect_all + <boolean></c></seecom> has the same effect as the + <c>connect_all</c> configuration parameter. If this + configuration parameter is defined, it will override + the command line argument. + </p> + </item> <tag><c>distributed = [Distrib]</c></tag> <item> <p>Specifies which applications that are distributed and on which diff --git a/lib/kernel/doc/src/net_kernel.xml b/lib/kernel/doc/src/net_kernel.xml index df517517d5..6d8de1f20c 100644 --- a/lib/kernel/doc/src/net_kernel.xml +++ b/lib/kernel/doc/src/net_kernel.xml @@ -161,82 +161,121 @@ $ <input>erl -sname foobar</input></pre> are stopped. Two option lists are considered the same if they contain the same set of options.</p> - <p>As from Kernel version 2.11.4, and ERTS version - 5.5.4, the following is guaranteed:</p> - <list type="bulleted"> - <item><p><c>nodeup</c> messages are delivered before delivery - of any message from the remote node passed through the - newly established connection.</p></item> - <item><p><c>nodedown</c> messages are not delivered until all - messages from the remote node that have been passed - through the connection have been delivered.</p></item> - </list> - <p>Notice that this is <em>not</em> guaranteed for Kernel - versions before 2.11.4.</p> - <p>As from Kernel version 2.11.4, subscriptions can also be - made before the <c>net_kernel</c> server is started, that is, - <c>net_kernel:monitor_nodes/[1,2]</c> does not return - <c>ignored</c>.</p> - <p>As from Kernel version 2.13, and ERTS version - 5.7, the following is guaranteed:</p> - <list type="bulleted"> - <item><p><c>nodeup</c> messages are delivered after the + <p>Delivery guarantees of <c>nodeup</c>/<c>nodedown</c> messages:</p> + <list> + <item><p> + <c>nodeup</c> messages are delivered before delivery + of any signals from the remote node through the newly + established connection. + </p></item> + <item> + <p><c>nodedown</c> messages are delivered after all + the signals from the remote node over the connection + have been delivered. + </p></item> + <item><p> + <c>nodeup</c> messages are delivered after the corresponding node appears in results from - <c>erlang:nodes/X</c>.</p></item> - <item><p><c>nodedown</c> messages are delivered after the + <c>erlang:nodes()</c>. + </p></item> + <item> + <p><c>nodedown</c> messages are delivered after the corresponding node has disappeared in results from - <c>erlang:nodes/X</c>.</p></item> + <c>erlang:nodes()</c>. + </p></item> + <item><p> + As of OTP 23.0, a <c>nodedown</c> message for a + connection being taken down will be delivered before a + <c>nodeup</c> message due to a new connection to the + same node. Prior to OTP 23.0, this was not + guaranteed to be the case. + </p></item> </list> - <p>Notice that this is <em>not</em> guaranteed for Kernel - versions before 2.13.</p> <p>The format of the node status change messages depends on <c><anno>Options</anno></c>. If <c><anno>Options</anno></c> is - <c>[]</c>, which is the default, the format is as follows:</p> - <code type="none"> + the empty list or if <c>net_kernel:monitor_nodes/1</c> is called, + the format is as follows:</p> + <code type="erl"> {nodeup, Node} | {nodedown, Node} Node = node()</code> - <p>If <c><anno>Options</anno></c> is not <c>[]</c>, the format is - as follows:</p> - <code type="none"> -{nodeup, Node, InfoList} | {nodedown, Node, InfoList} + <p> + When <c><anno>Options</anno></c> is the empty map or empty + list, the caller will only subscribe for status change messages + for visible nodes. That is, only nodes that appear in the + result of + <seemfa marker="erts:erlang#nodes/0"><c>erlang:nodes/0</c></seemfa>. + </p> + <p> + If <c><anno>Options</anno></c> equals anything other than the + empty list, the format of the status change messages is as follows: + </p> + <code type="erl"> +{nodeup, Node, Info} | {nodedown, Node, Info} Node = node() - InfoList = [{Tag, Val}]</code> - <p><c>InfoList</c> is a list of tuples. Its contents depends on - <c><anno>Options</anno></c>, see below.</p> - <p>Also, when <c>OptionList == []</c>, only visible nodes, that - is, nodes that appear in the result of - <seemfa marker="erts:erlang#nodes/0"><c>erlang:nodes/0</c></seemfa>, - are monitored.</p> - <p><c><anno>Option</anno></c> can be any of the following:</p> + Info = #{Tag => Val} | [{Tag, Val}]</code> + <p> + <c>Info</c> is either a map or a list of 2-tuples. Its content + depends on <c><anno>Options</anno></c>. If <c><anno>Options</anno></c> + is a map, <c>Info</c> will also be a map. If <c><anno>Options</anno></c> + is a list, <c>Info</c> will also be a list. + </p> + <p> + When <c><anno>Options</anno></c> is a map, currently + the following associations are allowed: + </p> <taglist> - <tag><c>{node_type, NodeType}</c></tag> + <tag><c>connection_id => boolean()</c></tag> + <item> + <p> + If the value of the association equals <c>true</c>, a + <c>connection_id => ConnectionId</c> association will be + included in the <c>Info</c> map where <c>ConnectionId</c> + is the connection identifier of the connection coming up + or going down. For more info about this connection + identifier see the documentation of + <seeerl marker="erts:erlang#connection_id">erlang:nodes/2</seeerl>. + </p> + </item> + <tag><c>node_type => <anno>NodeType</anno></c></tag> <item> <p>Valid values for <c>NodeType</c>:</p> <taglist> <tag><c>visible</c></tag> <item><p>Subscribe to node status change messages for visible - nodes only. The tuple <c>{node_type, visible}</c> is - included in <c>InfoList</c>.</p></item> + nodes only. The association <c>node_type => visible</c> will + be included in the <c>Info</c> map.</p></item> <tag><c>hidden</c></tag> <item><p>Subscribe to node status change messages for hidden - nodes only. The tuple <c>{node_type, hidden}</c> is - included in <c>InfoList</c>.</p></item> + nodes only. The association <c>node_type => hidden</c> will + be included in the <c>Info</c> map.</p></item> <tag><c>all</c></tag> <item><p>Subscribe to node status change messages for both - visible and hidden nodes. The tuple - <c>{node_type, visible | hidden}</c> is included in - <c>InfoList</c>.</p></item> + visible and hidden nodes. The association + <c>node_type => visible | hidden</c> will be included in + the <c>Info</c> map.</p></item> </taglist> + <p> + If no <c>node_type => <anno>NodeType</anno></c> association + is included in the <c><anno>Options</anno></c> map, the + caller will subscribe for status change messages for visible + nodes only, but <i>no</i> <c>node_type => visible</c> + association will be included in the <c>Info</c> map. + </p> </item> - <tag><c>nodedown_reason</c></tag> + <tag><c>nodedown_reason => boolean()</c></tag> <item> - <p>The tuple <c>{nodedown_reason, Reason}</c> is included in - <c>InfoList</c> in <c>nodedown</c> messages.</p> + <p> + If the value of the association equals <c>true</c>, a + <c>nodedown_reason => Reason</c> association will be + included in the <c>Info</c> map for <c>nodedown</c> + messages. + </p> + <marker id="nodedown_reasons"/> <p> <c>Reason</c> can, depending on which - distribution module or process that is used be any term, + distribution module or process that is used, be any term, but for the standard TCP distribution module it is - any of the following: + one of the following: </p> <taglist> <tag><c>connection_setup_failed</c></tag> @@ -263,6 +302,82 @@ $ <input>erl -sname foobar</input></pre> </taglist> </item> </taglist> + <p> + When <c><anno>Options</anno></c> is a list, currently + <c><anno>ListOption</anno></c> can be one of the following: + </p> + <taglist> + <tag><c>connection_id</c></tag> + <item> + <p> + A <c>{connection_id, ConnectionId}</c> tuple will be + included in <c>Info</c> where <c>ConnectionId</c> is the + connection identifier of the connection coming up or + going down. For more info about this connection identifier + see the documentation of + <seeerl marker="erts:erlang#connection_id">erlang:nodes/2</seeerl>. + </p> + </item> + <tag><c>{node_type, <anno>NodeType</anno>}</c></tag> + <item> + <p>Valid values for <c><anno>NodeType</anno></c>:</p> + <taglist> + <tag><c>visible</c></tag> + <item><p>Subscribe to node status change messages for visible + nodes only. The tuple <c>{node_type, visible}</c> will be + included in the <c>Info</c> list.</p></item> + <tag><c>hidden</c></tag> + <item><p>Subscribe to node status change messages for hidden + nodes only. The tuple <c>{node_type, hidden}</c> will be + included in the <c>Info</c> list.</p></item> + <tag><c>all</c></tag> + <item><p>Subscribe to node status change messages for both + visible and hidden nodes. The tuple + <c>{node_type, visible | hidden}</c> will be included in + the <c>Info</c> list.</p></item> + </taglist> + <p> + If no <c>{node_type, <anno>NodeType</anno>}</c> option + has been given. The caller will subscribe for status + change messages for visible nodes only, but <i>no</i> + <c>{node_type, visible}</c> tuple will be included in the + <c>Info</c> list. + </p> + </item> + <tag><c>nodedown_reason</c></tag> + <item> + <p> + The tuple <c>{nodedown_reason, Reason}</c> will be included + in the <c>Info</c> list for <c>nodedown</c> messages. + </p> + <p> + See the documentation of the + <seeerl marker="#nodedown_reasons"><c>nodedown_reason + => boolean()</c></seeerl> association above for information + about possible <c>Reason</c> values. + </p> + </item> + </taglist> + <p>Example:</p> + <code type="erl"> +(a@localhost)1> net_kernel:monitor_nodes(true, #{connection_id=>true, node_type=>all, nodedown_reason=>true}). +ok +(a@localhost)2> flush(). +Shell got {nodeup,b@localhost, + #{connection_id => 3067552,node_type => visible}} +Shell got {nodeup,c@localhost, + #{connection_id => 13892107,node_type => hidden}} +Shell got {nodedown,b@localhost, + #{connection_id => 3067552,node_type => visible, + nodedown_reason => connection_closed}} +Shell got {nodedown,c@localhost, + #{connection_id => 13892107,node_type => hidden, + nodedown_reason => net_tick_timeout}} +Shell got {nodeup,b@localhost, + #{connection_id => 3067553,node_type => visible}} +ok +(a@localhost)3> + </code> </desc> </func> diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl index 75a48e8ac4..c0981140d0 100644 --- a/lib/kernel/src/dist_util.erl +++ b/lib/kernel/src/dist_util.erl @@ -139,7 +139,7 @@ publish_flag(_, NameMeFlg, _) when (NameMeFlg band ?DFLAG_NAME_ME) =/= 0 -> ?DFLAG_NAME_ME; publish_flag(hidden, _, _) -> 0; -publish_flag(_, _, OtherNode) when is_atom(OtherNode) -> +publish_flag(_, _, OtherNode) -> case net_kernel:publish_on_node(OtherNode) of true -> ?DFLAG_PUBLISHED; diff --git a/lib/kernel/src/global.erl b/lib/kernel/src/global.erl index 503a8cd1ae..1be91b3f15 100644 --- a/lib/kernel/src/global.erl +++ b/lib/kernel/src/global.erl @@ -40,7 +40,7 @@ %% Internal exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, resolve_it/4]). + code_change/3, resolve_it/4, get_locker/0]). -export([info/0]). @@ -92,11 +92,22 @@ %% Vsn 7 - propagate global versions between nodes, so we always know %% versions of known nodes %% - optional "prevent overlapping partitions" fix supported +%% Vsn 8 - "verify connection" part of the protocol preventing +%% deadlocks in connection setup due to locker processes +%% being out of sync +%% - "prevent overlapping partitions" fix also for systems +%% configured to use global groups %% Current version of global does not support vsn 4 or earlier. --define(vsn, 7). +-define(vsn, 8). +%% Version when the "verify connection" part of the protocol +%% was introduced. +-define(verify_connect_vsn, 8). +%% Version where "prevent overlapping partitions" fix for global groups +%% was introduced. +-define(gg_pop_vsn, 8). %% Version when the "prevent overlapping partitions" fix was introduced. -define(pop_vsn, 7). %% Version when the "propagate global protocol versions" feature @@ -143,7 +154,9 @@ %% protocol version as value {pending, node()} => non_neg_integer(), %% Node currently being removed - {removing, node()} => yes + {removing, node()} => yes, + %% Connection id of connected nodes + {connection_id, node()} => integer() }, synced = [] :: [node()], resolvers = [], @@ -297,7 +310,7 @@ check_dupname(Name, Pid) -> _ -> S = "global: ~w registered under several names: ~tw\n", Names = [Name | [Name1 || {_Pid, Name1} <- PidNames]], - error_logger:error_msg(S, [Pid, Names]), + logger:log(error, S, [Pid, Names]), false end end. @@ -479,6 +492,40 @@ info() -> init([]) -> process_flag(trap_exit, true), + + %% Monitor all 'nodeup'/'nodedown' messages of visible nodes. + %% In case + %% + %% * no global group is configured, we use these as is. This + %% way we know that 'nodeup' comes before any traffic from + %% the node on the newly established connection and 'nodedown' + %% comes after any traffic on this connection from the node. + %% + %% * global group is configured, we ignore 'nodeup' and instead + %% rely on 'group_nodeup' messages passed by global_group and + %% filter 'nodedown' based on if the node is part of our group + %% or not. We need to be prepared for traffic from the node + %% on the newly established connection arriving before the + %% 'group_nodeup'. 'nodedown' will however not arrive until + %% all traffic from the node on this connection has arrived. + %% + %% In case a connection goes down and then up again, the + %% 'nodedown' for the old connection is nowadays guaranteed to + %% be delivered before the 'nodeup' for the new connection. + %% + %% By keeping track of connection_id for all connections we + %% can differentiate between different instances of connections + %% to the same node. + ok = net_kernel:monitor_nodes(true, #{connection_id => true}), + + %% There are most likely no connected nodes at this stage, + %% but check to make sure... + Known = lists:foldl(fun ({N, #{connection_id := CId}}, Cs) -> + Cs#{{connection_id, N} => CId} + end, + #{}, + nodes(visible, #{connection_id => true})), + _ = ets:new(global_locks, [set, named_table, protected]), _ = ets:new(global_names, [set, named_table, protected, {read_concurrency, true}]), @@ -487,6 +534,7 @@ init([]) -> _ = ets:new(global_pid_names, [bag, named_table, protected]), _ = ets:new(global_pid_ids, [bag, named_table, protected]), _ = ets:new(global_lost_connections, [set, named_table, protected]), + _ = ets:new(global_node_resources, [set, named_table, protected]), %% This is for troubleshooting only. DoTrace = os:getenv("GLOBAL_HIGH_LEVEL_TRACE") =:= "TRUE", @@ -498,24 +546,37 @@ init([]) -> no_trace end, - Ca = case init:get_argument(connect_all) of - {ok, [["false"]]} -> - false; - _ -> - true + Ca = case application:get_env(kernel, connect_all) of + {ok, CaBool} when is_boolean(CaBool) -> + CaBool; + {ok, CaInvalid} -> + error({invalid_parameter_value, connect_all, CaInvalid}); + undefined -> + CaBool = case init:get_argument(connect_all) of + {ok, [["false" | _] | _]} -> + false; + _ -> + true + end, + ok = application:set_env(kernel, connect_all, CaBool, + [{timeout, infinity}]), + CaBool end, + POP = case application:get_env(kernel, prevent_overlapping_partitions) of - {ok, Bool} when Bool == true; Bool == false -> - Bool; - {ok, Invalid} -> + {ok, PopBool} when is_boolean(PopBool) -> + PopBool; + {ok, PopInvalid} -> error({invalid_parameter_value, prevent_overlapping_partitions, - Invalid}); + PopInvalid}); undefined -> false end, + S = #state{the_locker = start_the_locker(DoTrace), + known = Known, trace = T0, the_registrar = start_the_registrar(), conf = #conf{connect_all = Ca, @@ -579,6 +640,16 @@ init([]) -> %% which makes it possible to discard those messages and cancel the %% corresponding lock. %% +%% - The lockers begin locking operations as soon as the init_connect +%% messages has been exchanged and do not wait for init_connect_ack. +%% They could even complete before init_connect_ack messages are +%% received. The init_connect_ack messages are only there to confirm +%% that both nodes has the same view of which connect session is +%% ongoing. If lockers get out of sync, the lock will not be able +%% to be aquired on both nodes. The out of sync lock operation will +%% be detected when the init_connect_ack message is received and the +%% operation can be cancelled and then restarted. +%% %% Suppose nodes A and B connect, and C is connected to A. %% Here's the algorithm's flow: %% @@ -588,6 +659,8 @@ init([]) -> %% TheLocker ! {nodeup, ..., Node, ...} (there is one locker per node) %% B ! {init_connect, ..., {..., TheLockerAtA, ...}} %% << {init_connect, TheLockerAtB} +%% B ! {init_connect_ack, ...} +%% << {init_connect_ack, ...} %% [The lockers try to set the lock] %% << {lock_is_set, B, ...} %% [Now, lock is set in both partitions] @@ -598,6 +671,10 @@ init([]) -> %% << {resolved, B, ResolvedB, KnownAtB, ...} %% C ! {new_nodes, ResolvedAandB, [B]} %% +%% When cancelling a connect, also the remote node is nowadays also +%% informed using: +%% B ! {cancel_connect, ...} +%% %% Node C %% ------ %% << {new_nodes, ResolvedOps, NewNodes} @@ -761,10 +838,10 @@ handle_call(stop, _From, S) -> {stop, normal, stopped, S}; handle_call(Request, From, S) -> - error_logger:warning_msg("The global_name_server " - "received an unexpected message:\n" - "handle_call(~tp, ~tp, _)\n", - [Request, From]), + logger:log(warning, "The global_name_server " + "received an unexpected message:\n" + "handle_call(~tp, ~tp, _)\n", + [Request, From]), {noreply, S}. %%======================================================================== @@ -793,7 +870,7 @@ handle_cast({init_connect, Vsn, Node, InitMsg}, S0) -> _ -> Txt = io_lib:format("Illegal global protocol version ~p Node: ~p\n", [Vsn, Node]), - error_logger:info_report(lists:flatten(Txt)), + logger:log(info, lists:flatten(Txt)), S0 end, {noreply, S}; @@ -904,18 +981,22 @@ handle_cast({new_nodes, Node, Ops, Names_ext, Nodes, ExtraInfo}, S) -> %% %% We are in sync with this node (from the other node's known world). %%======================================================================== -handle_cast({in_sync, Node, _IsKnown}, #state{known = Known} = S) -> +handle_cast({in_sync, Node, _IsKnown}, #state{known = Known, + synced = Synced} = S0) -> %% Sent from global_name_server at Node (in the other partition). ?trace({'####', in_sync, {Node, _IsKnown}}), - lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers), - NewS = cancel_locker(Node, S, get({sync_tag_my, Node})), - reset_node_state(Node), - NSynced = case lists:member(Node, Synced = NewS#state.synced) of + lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S0#state.syncers), + NSynced = case lists:member(Node, Synced) of true -> Synced; false -> [Node | Synced] end, - {noreply, NewS#state{known = maps:remove({pending, Node}, Known), - synced = NSynced}}; + S1 = S0#state{known = maps:remove({pending, Node}, Known), + synced = NSynced}, + %% {pending, Node} removed, i.e., we wont send a cancel_connect + %% message to Node in cancel_locker()... + S2 = cancel_locker(Node, S1, get({sync_tag_my, Node})), + reset_node_state(Node), + {noreply, S2}; %% Called when Pid on other node crashed handle_cast({async_del_name, _Name, _Pid}, S) -> @@ -931,15 +1012,26 @@ handle_cast({async_del_lock, _ResourceId, _Pid}, S) -> %% R14A nodes and later do not send async_del_lock messages. {noreply, S}; +handle_cast({lock_set, Pid, _Set, _HisKnown, MyTag} = Message, S) -> + #state{the_locker = Locker} = S, + Node = node(Pid), + case get({sync_tag_my, Node}) of + MyTag -> + Locker ! Message, + ok; + _NewMyTag -> + ok + end, + {noreply, S}; handle_cast({lock_set, _Pid, _Set, _HisKnown} = Message, S) -> #state{the_locker = Locker} = S, Locker ! Message, {noreply, S}; handle_cast(Request, S) -> - error_logger:warning_msg("The global_name_server " - "received an unexpected message:\n" - "handle_cast(~tp, _)\n", [Request]), + logger:log(warning, "The global_name_server " + "received an unexpected message:\n" + "handle_cast(~tp, _)\n", [Request]), {noreply, S}. %%======================================================================== @@ -960,77 +1052,106 @@ handle_info({'EXIT', Pid, _Reason}, S) when is_pid(Pid) -> Syncers = lists:delete(Pid, S#state.syncers), {noreply, S#state{syncers = Syncers}}; -handle_info({nodedown, Node}, S) when Node =:= S#state.node_name -> +handle_info({nodedown, Node, _Info}, S) when Node =:= S#state.node_name -> %% Somebody stopped the distribution dynamically - change %% references to old node name (Node) to new node name ('nonode@nohost') {noreply, change_our_node_name(node(), S)}; -handle_info({nodedown, Node}, S0) -> - ?trace({'####', nodedown, {node,Node}}), - S1 = trace_message(S0, {nodedown, Node}, []), - S = handle_nodedown(Node, S1, disconnected), - {noreply, S}; +handle_info({nodedown, Node, Info}, S0) -> + ?trace({'####', nodedown, {node,Node, Info}}), + S1 = trace_message(S0, {nodedown, Node, Info}, []), + NodeDownType = case global_group:participant(Node) of + true -> disconnected; + false -> ignore_node + end, + S2 = handle_nodedown(Node, S1, NodeDownType), + Known = maps:remove({connection_id, Node}, S2#state.known), + {noreply, S2#state{known = Known}}; handle_info({extra_nodedown, Node}, S0) -> ?trace({'####', extra_nodedown, {node,Node}}), S1 = trace_message(S0, {extra_nodedown, Node}, []), - S = handle_nodedown(Node, S1, disconnected), - {noreply, S}; + NodeDownType = case global_group:participant(Node) of + true -> disconnected; + false -> ignore_node + end, + %% Syncers wont notice this unless we send them + %% a nodedown... + lists:foreach(fun(Pid) -> Pid ! {nodedown, Node} end, S1#state.syncers), + S2 = handle_nodedown(Node, S1, NodeDownType), + Known = maps:remove({connection_id, Node}, S2#state.known), + {noreply, S2#state{known = Known}}; + +handle_info({group_nodedown, Node, CId}, + #state{known = Known, + conf = #conf{connect_all = CA, + prevent_over_part = POP}} = S0) -> + %% Node is either not part of our group or its global_group + %% configuration is not in sync with our configuration... + ?trace({'####', group_nodedown, {node,Node}}), + S1 = trace_message(S0, {group_nodedown, Node, CId}, []), + S = case maps:get({connection_id, Node}, Known, not_connected) of + CId -> + %% We cannot rely on 'DOWN' messages for locks and + %% names on this node since the connection can remain + %% up. Explicitly take care of them... + S2 = delete_node_resources(Node, S1), + + %% Syncers wont notice this unless we send them + %% a nodedown... + lists:foreach(fun(Pid) -> Pid ! {nodedown, Node} end, + S2#state.syncers), + NodeDownType = case (CA andalso POP + andalso global_group:member(Node)) of + false -> ignore_node; + true -> disconnected + end, + handle_nodedown(Node, S2, NodeDownType); -handle_info({ignore_node, Node}, S0) -> - %% global_group wants us to ignore this node... - ?trace({'####', ignore_node, {node,Node}}), - S1 = trace_message(S0, {ignore_node, Node}, []), - S = handle_nodedown(Node, S1, ignore_node), + _ -> + %% Corresponding connection no longer exist; ignore it... + S1 + end, {noreply, S}; -handle_info({nodeup, Node}, S) when Node =:= node() -> +handle_info({nodeup, Node, _Info}, S) when Node =:= node() -> ?trace({'####', local_nodeup, {node, Node}}), %% Somebody started the distribution dynamically - change %% references to old node name ('nonode@nohost') to Node. {noreply, change_our_node_name(Node, S)}; -handle_info({nodeup, _Node}, - #state{conf = #conf{connect_all = false}} = S) -> - {noreply, S}; - -handle_info({nodeup, Node}, S0) -> - IsKnown = maps:is_key(Node, S0#state.known) or - %% This one is only for double nodeups (shouldn't occur!) - lists:keymember(Node, 1, S0#state.resolvers), - ?trace({'####', nodeup, {node,Node}, {isknown,IsKnown}}), - S1 = trace_message(S0, {nodeup, Node}, []), - case IsKnown of - true -> - {noreply, S1}; - false -> - resend_pre_connect(Node), - - %% erlang:unique_integer([monotonic]) is used as a tag to - %% separate different synch sessions - %% from each others. Global could be confused at bursty nodeups - %% because it couldn't separate the messages between the different - %% synch sessions started by a nodeup. - MyTag = erlang:unique_integer([monotonic]), - put({sync_tag_my, Node}, MyTag), - ?trace({sending_nodeup_to_locker, {node,Node},{mytag,MyTag}}), - S1#state.the_locker ! {nodeup, Node, MyTag}, - - %% In order to be compatible with unpatched R7 a locker - %% process was spawned. Vsn 5 is no longer compatible with - %% vsn 3 nodes, so the locker process is no longer needed. - %% The permanent locker takes its place. - NotAPid = no_longer_a_pid, - Locker = {locker, NotAPid, S1#state.known, S1#state.the_locker}, - InitC = {init_connect, {?vsn, MyTag}, node(), Locker}, - Rs = S1#state.resolvers, - ?trace({casting_init_connect, {node,Node},{initmessage,InitC}, - {resolvers,Rs}}), - gen_server:cast({global_name_server, Node}, InitC), - Resolver = start_resolver(Node, MyTag), - S = trace_message(S1, {new_resolver, Node}, [MyTag, Resolver]), - {noreply, S#state{resolvers = [{Node, MyTag, Resolver} | Rs]}} - end; +handle_info({nodeup, Node, #{connection_id := CId}}, + #state{known = Known, + conf = #conf{connect_all = false}} = S) -> + {noreply, S#state{known = Known#{{connection_id, Node} => CId}}}; + +handle_info({nodeup, Node, #{connection_id := CId}}, + #state{known = Known} = S0) -> + ?trace({'####', nodeup, {node,Node}}), + S1 = S0#state{known = Known#{{connection_id, Node} => CId}}, + S2 = trace_message(S1, {nodeup, Node}, []), + S3 = case global_group:group_configured() of + false -> + handle_nodeup(Node, S2); + true -> + %% We will later get a 'group_nodeup' message if + %% the node is in our group, and has a group + %% configuration that is in sync with our + %% configuration... + S2 + end, + {noreply, S3}; + +handle_info({group_nodeup, Node, CId}, #state{known = Known} = S0) -> + %% This message may arrive after other traffic from Node + %% on the current connection... + ?trace({'####', group_nodeup, {node,Node,CId}}), + S1 = trace_message(S0, {group_nodeup, Node, CId}, []), + S2 = case maps:get({connection_id, Node}, Known, not_connected) of + CId -> handle_nodeup(Node, S1); + _ -> S1 %% Late group_nodeup; connection has gone down... + end, + {noreply, S2}; handle_info({whereis, Name, From}, S) -> _ = do_whereis(Name, From), @@ -1079,30 +1200,35 @@ handle_info({lost_connection, NodeA, XCreationA, OpIdA, NodeB} = Msg, NodeA end, - case is_node_potentially_known(RmNode, S0) of + case is_node_potentially_known(RmNode, S0) + andalso global_group:participant(RmNode) of false -> S0; true -> - case node_vsn(RmNode, S0) of - Vsn when Vsn < ?pop_vsn -> - erlang:disconnect_node(RmNode), - error_logger:warning_msg( - "'global' at node ~p disconnected old " - "node ~p in order to prevent overlapping " - "partitions", - [node(), RmNode]), - ok; - _Vsn -> - gns_volatile_send(RmNode, - {remove_connection, node()}), - error_logger:warning_msg( - "'global' at node ~p requested disconnect " - "from node ~p in order to prevent " - "overlapping partitions", - [node(), RmNode]), - ok - end, - handle_nodedown(RmNode, S0, remove_connection) + {NDType, What} = + case node_vsn(RmNode, S0) of + Vsn when Vsn < ?pop_vsn -> + net_kernel:async_disconnect(RmNode), + {remove_connection, "disconnected old"}; + Vsn -> + gns_volatile_send(RmNode, + {remove_connection, + node()}), + case global_group:member(RmNode) + andalso Vsn >= ?gg_pop_vsn of + true -> + {ignore_node, + "excluded global group member"}; + false -> + {remove_connection, + "requested disconnect from"} + end + end, + logger:log(warning, + "'global' at node ~p ~s node ~p in order " + "to prevent overlapping partitions", + [node(), What, RmNode]), + handle_nodedown(RmNode, S0, NDType) end end, @@ -1123,15 +1249,58 @@ handle_info({remove_connection, Node}, S0) -> false -> S0; true -> - erlang:disconnect_node(Node), - S1 = handle_nodedown(Node, S0, remove_connection), - error_logger:warning_msg( - "'global' at node ~p disconnected node ~p in order to " - "prevent overlapping partitions", [node(), Node]), + {NDType, What} + = case global_group:member(Node) of + true -> + global_group ! {disconnect_node, Node}, + {ignore_node, "excluded global group member"}; + false -> + net_kernel:async_disconnect(Node), + {remove_connection, "disconnected"} + end, + S1 = handle_nodedown(Node, S0, NDType), + logger:log(warning, + "'global' at node ~p ~s node ~p in order to " + "prevent overlapping partitions", + [node(), What, Node]), S1 end, {noreply, S2}; +handle_info({cancel_connect, Node, MyTag}, S0) -> + %% An ongoing connect was canceled by the other side... + %% + %% Message introduced in protocol version ?verify_connect_vsn + + S3 = case get({sync_tag_my, Node}) of + MyTag -> + S1 = cancel_locker(Node, S0, MyTag), + reset_node_state(Node), + S2 = S1#state{known = maps:remove({pending, Node}, + S1#state.known)}, + restart_connect(Node, MyTag, S2); + _ -> + S0 + end, + {noreply, S3}; + +handle_info({init_connect_ack, Node, HisMyTag, HisHisTag}, S0) -> + %% Message introduced in protocol version ?verify_connect_vsn + + MyMyTag = get({sync_tag_my, Node}), + MyHisTag = get({sync_tag_his, Node}), + S1 = case MyMyTag =:= HisMyTag andalso MyHisTag =:= HisHisTag of + true -> + S0; + false -> + %% Connection attempt out of sync; cancel + %% this attempt and start over... + + send_cancel_connect_message(Node, HisHisTag), + restart_connect(Node, MyMyTag, S0) + end, + {noreply, S1}; + handle_info({prepare_shutdown, From, Ref}, S0) -> %% Prevent lost_connection messages being sent due to %% connections being taken down during the shutdown... @@ -1171,14 +1340,15 @@ handle_info({trace_message, M, X}, S) -> {noreply, trace_message(S, M, X)}; handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S0) -> + delete_node_resource_info(MonitorRef), S1 = delete_lock(MonitorRef, S0), S = del_name(MonitorRef, S1), {noreply, S}; handle_info(Message, S) -> - error_logger:warning_msg("The global_name_server " - "received an unexpected message:\n" - "handle_info(~tp, _)\n", [Message]), + logger:log(warning, "The global_name_server " + "received an unexpected message:\n" + "handle_info(~tp, _)\n", [Message]), {noreply, S}. @@ -1188,6 +1358,47 @@ handle_info(Message, S) -> %%======================================================================== %%======================================================================== +save_node_resource_info(Node, Mon) -> + NewRes = case ets:lookup(global_node_resources, Node) of + [] -> #{Mon => ok}; + [{Node, OldRes}] -> OldRes#{Mon => ok} + end, + true = ets:insert(global_node_resources, [{Node, NewRes}, {Mon, Node}]), + ok. + +delete_node_resource_info(Mon) -> + case ets:lookup(global_node_resources, Mon) of + [] -> + ok; + [{Mon, Node}] -> + [{Node, OldRes}] = ets:lookup(global_node_resources, Node), + NewRes = maps:remove(Mon, OldRes), + true = ets:delete(global_node_resources, Mon), + case maps:size(NewRes) of + 0 -> + true = ets:delete(global_node_resources, Node), + ok; + _ -> + true = ets:insert(global_node_resources, {Node, NewRes}), + ok + end + end. + +delete_node_resources(Node, #state{} = State) -> + case ets:lookup(global_node_resources, Node) of + [] -> + State; + [{Node, Resources}] -> + true = ets:delete(global_node_resources, Node), + maps:fold(fun (Mon, ok, AccS0) -> + erlang:demonitor(Mon, [flush]), + true = ets:delete(global_node_resources, Mon), + AccS1 = delete_lock(Mon, AccS0), + del_name(Mon, AccS1) + end, + State, Resources) + end. + -define(HIGH_LEVEL_TRACE_INTERVAL, 500). % ms wait_high_level_trace() -> @@ -1280,25 +1491,90 @@ check_replies([], _Id, _Replies) -> %% Both nodes must have a lock before they are allowed to continue. %%======================================================================== init_connect(Vsn, Node, InitMsg, HisTag, HisVsn, - #state{resolvers = Resolvers, known = Known} = S) -> + #state{known = Known0} = S0) -> %% It is always the responsibility of newer versions to understand %% older versions of the protocol. - put({prot_vsn, Node}, Vsn), - put({sync_tag_his, Node}, HisTag), - case lists:keyfind(Node, 1, Resolvers) of - {Node, MyTag, _Resolver} -> - MyTag = get({sync_tag_my, Node}), % assertion - {locker, _NoLongerAPid, _HisKnown0, HisTheLocker} = InitMsg, - ?trace({init_connect,{histhelocker,HisTheLocker}}), - HisKnown = [], - S#state.the_locker ! {his_the_locker, HisTheLocker, - {Vsn,HisKnown}, S#state.known}; - false -> - ?trace({init_connect,{pre_connect,Node},{histag,HisTag}}), - put({pre_connect, Node}, {Vsn, InitMsg, HisTag}) - end, - S#state{known = maps:put({pending, Node}, HisVsn, Known)}. + try + S1 = case maps:is_key({pending, Node}, Known0) of + false -> + S0; + true -> + %% This should not be possible unless global group has + %% been configured. We got an already ongoing connection + %% setup with Node and get yet another connection attempt + %% from Node. + + if HisVsn < ?verify_connect_vsn -> + %% Old node that cannot handle this; give up... + erlang:disconnect_node(Node), + logger:log(error, + "'global' at node ~p got an out of " + "sync connection attempt from old " + "version ~p node ~p. Disconnecting " + "from it.", [node(), HisVsn, Node]), + throw({return, S0}); + + true -> + %% Cancel this new connection attempt as well + %% as the ongoing connection setup then restart + %% the connect by making a new connection + %% attempt... + + send_cancel_connect_message(Node, HisTag), + MyOldTag = get({sync_tag_my, Node}), + restart_connect(Node, MyOldTag, S0) + end + end, + put({prot_vsn, Node}, Vsn), + put({sync_tag_his, Node}, HisTag), + case lists:keyfind(Node, 1, S1#state.resolvers) of + {Node, MyTag, _Resolver} -> + MyTag = get({sync_tag_my, Node}), % assertion + {locker, _NoLongerAPid, _HisKnown0, HisTheLocker} = InitMsg, + ?trace({init_connect,{histhelocker,HisTheLocker}}), + HisKnown = [], + S1#state.the_locker ! {his_the_locker, HisTheLocker, + {Vsn,HisKnown}, HisTag, MyTag}, + if HisVsn < ?verify_connect_vsn -> + ok; + true -> + gns_volatile_send(Node, + {init_connect_ack, node(), + HisTag, MyTag}) + end, + Known1 = S1#state.known, + S1#state{known = Known1#{{pending, Node} => HisVsn}}; + false -> + ?trace({init_connect,{pre_connect,Node},{histag,HisTag}}), + put({pre_connect, Node}, {Vsn, InitMsg, HisTag}), + S1 + end + + catch + throw:{return, S} -> + S + end. + +restart_connect(Node, MyTag, S0) -> + %% We got a mismatch in connection setup; cancel + %% current connection setup and try again... + + S1 = cancel_locker(Node, S0, MyTag), + reset_node_state(Node), + S2 = S1#state{known = maps:remove({pending, Node}, + S1#state.known)}, + + if is_integer(MyTag) -> + %% Node is up from our perspective; start a new resolver + %% and send a new init_connect... + handle_nodeup(Node, S2); + true -> + %% Node is down from our prespective; wait until + %% global_group say Node is up by sending us a + %% group_nodeup message... + S2 + end. %%======================================================================== %% In the simple case, we'll get lock_is_set before we get exchange, @@ -1348,13 +1624,15 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) -> HisKnownNodes = node_list(HisKnown), sync_others(HisKnownNodes), ExtraInfo = [{vsn,get({prot_vsn, Node})}, {lock, get({lock_id, Node})}], - S = do_ops(Ops, node(), Names_ext, ExtraInfo, S0), + S1 = do_ops(Ops, node(), Names_ext, ExtraInfo, S0), %% I am synced with Node, but not with HisKnown yet - lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers), - S3 = lists:foldl(fun(Node1, S1) -> - F = fun(Tag) -> cancel_locker(Node1,S1,Tag) end, - cancel_resolved_locker(Node1, F) - end, S, HisKnownNodes), + lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S1#state.syncers), + S2 = lists:foldl(fun(CnclNode, AccS) -> + F = fun(Tag, CnclS) -> + cancel_locker(CnclNode, CnclS, Tag) + end, + cancel_resolved_locker(CnclNode, F, AccS) + end, S1, HisKnownNodes), %% The locker that took the lock is asked to send %% the {new_nodes, ...} message. This ensures that %% {del_lock, ...} is received after {new_nodes, ...} @@ -1377,21 +1655,26 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) -> end, maps:keys(Known)) end, - F = fun(Tag) -> cancel_locker(Node, S3, Tag, NewNodesF) end, - S4 = cancel_resolved_locker(Node, F), + F = fun(Tag, CnclS) -> + cancel_locker(Node, CnclS, Tag, NewNodesF) + end, + S3 = cancel_resolved_locker(Node, F, S2), %% See (*) below... we're node b in that description - {AddedNodes, S5} = add_to_known(NewNodes, S4), - S5#state.the_locker ! {add_to_known, AddedNodes}, - S6 = trace_message(S5, {added, AddedNodes}, + {AddedNodes, S4} = add_to_known(NewNodes, S3), + S4#state.the_locker ! {add_to_known, AddedNodes}, + S5 = trace_message(S4, {added, AddedNodes}, [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]), - S6#state{synced = [Node | Synced]}. + S5#state{synced = [Node | Synced]}. -cancel_resolved_locker(Node, CancelFun) -> +cancel_resolved_locker(Node, CancelFun, #state{known = Known} = S0) -> Tag = get({sync_tag_my, Node}), ?trace({calling_cancel_locker,Tag,get()}), - S = CancelFun(Tag), + S1 = S0#state{known = maps:remove({pending, Node}, Known)}, + %% {pending, Node} removed, i.e., we wont send a cancel_connect + %% message to Node in CancelFun()... + S2 = CancelFun(Tag, S1), reset_node_state(Node), - S#state{known = maps:remove({pending, Node}, S#state.known)}. + S2. new_nodes(Ops, ConnNode, Names_ext, Nodes, ExtraInfo, S0) -> %% (*) This one requires some thought... @@ -1450,9 +1733,9 @@ resolver(Node, Tag) -> resend_pre_connect(Node) -> case erase({pre_connect, Node}) of {Vsn, InitMsg, HisTag} -> - gen_server:cast(self(), + gen_server:cast(self(), {init_connect, {Vsn, HisTag}, Node, InitMsg}); - _ -> + _ -> ok end. @@ -1510,6 +1793,7 @@ can_set_lock({ResourceId, LockRequesterId}) -> insert_lock({ResourceId, LockRequesterId}=Id, Pid, PidRefs, S) -> Ref = erlang:monitor(process, Pid), + save_node_resource_info(node(Pid), Ref), true = ets:insert(global_pid_ids, {Pid, ResourceId}), true = ets:insert(global_pid_ids, {Ref, ResourceId}), Lock = {ResourceId, LockRequesterId, [{Pid,Ref} | PidRefs]}, @@ -1532,6 +1816,7 @@ handle_del_lock({ResourceId, LockReqId}, Pid, S0) -> remove_lock(ResourceId, LockRequesterId, Pid, [{Pid,Ref}], Down, S0) -> ?trace({remove_lock_1, {id,ResourceId},{pid,Pid}}), + delete_node_resource_info(Ref), true = erlang:demonitor(Ref, [flush]), true = ets:delete(global_locks, ResourceId), true = ets:delete_object(global_pid_ids, {Pid, ResourceId}), @@ -1546,6 +1831,7 @@ remove_lock(ResourceId, LockRequesterId, Pid, PidRefs0, _Down, S) -> ?trace({remove_lock_2, {id,ResourceId},{pid,Pid}}), PidRefs = case lists:keyfind(Pid, 1, PidRefs0) of {Pid, Ref} -> + delete_node_resource_info(Ref), true = erlang:demonitor(Ref, [flush]), true = ets:delete_object(global_pid_ids, {Ref, ResourceId}), @@ -1614,8 +1900,8 @@ sync_other(Node, N) -> sync_other(Node, N - 1); {nodedown, Node} -> ?trace({missing_nodedown, {node, Node}}), - error_logger:warning_msg("global: ~w failed to connect to ~w\n", - [node(), Node]), + logger:log(warning, "'global' at ~w failed to connect to ~w\n", + [node(), Node]), global_name_server ! {extra_nodedown, Node} after 0 -> gen_server:cast({global_name_server,Node}, {in_sync,node(),true}) @@ -1625,6 +1911,7 @@ sync_other(Node, N) -> insert_global_name(Name, Pid, Method, FromPidOrNode, ExtraInfo, S) -> Ref = erlang:monitor(process, Pid), + save_node_resource_info(node(Pid), Ref), true = ets:insert(global_names, {Name, Pid, Method, Ref}), true = ets:insert(global_pid_names, {Pid, Name}), true = ets:insert(global_pid_names, {Ref, Name}), @@ -1696,6 +1983,7 @@ delete_global_name2(Name, S) -> end. delete_global_name2(Name, Pid, Ref, S) -> + delete_node_resource_info(Ref), true = erlang:demonitor(Ref, [flush]), delete_global_name(Name, Pid), ?trace({delete_global_name,{item,Name},{pid,Pid}}), @@ -1752,7 +2040,7 @@ delete_global_name(_Name, _Pid) -> do_trace % bool() }). --record(him, {node, locker, vsn, my_tag}). +-record(him, {node, locker, vsn, my_tag, his_tag}). start_the_locker(DoTrace) -> spawn_link(init_the_locker_fun(DoTrace)). @@ -1771,7 +2059,7 @@ init_the_locker_fun(DoTrace) -> loop_the_locker(S) -> ?trace({loop_the_locker,S}), receive - Message when element(1, Message) =/= nodeup -> + Message -> the_locker_message(Message, S) after 0 -> Timeout = @@ -1795,7 +2083,7 @@ loop_the_locker(S) -> end, S1 = S#multi{just_synced = false}, receive - Message when element(1, Message) =/= nodeup -> + Message -> the_locker_message(Message, S1) after Timeout -> case is_global_lock_set() of @@ -1807,51 +2095,36 @@ loop_the_locker(S) -> end end. -the_locker_message({his_the_locker, HisTheLocker, HisKnown0, _MyKnown}, S) -> - ?trace({his_the_locker, HisTheLocker, {node,node(HisTheLocker)}}), +the_locker_message({his_the_locker, HisTheLocker, HisKnown0, HisTag, MyTag} = _HtlMsg, S) -> + ?trace({the_locker, his_the_locker, {node,node(HisTheLocker)}, _HtlMsg}), {HisVsn, _HisKnown} = HisKnown0, true = HisVsn > 4, - receive - {nodeup, Node, MyTag} when node(HisTheLocker) =:= Node -> - ?trace({the_locker_nodeup, {node,Node},{mytag,MyTag}}), - Him = #him{node = node(HisTheLocker), my_tag = MyTag, - locker = HisTheLocker, vsn = HisVsn}, - loop_the_locker(add_node(Him, S)); - {cancel, Node, _Tag, no_fun} when node(HisTheLocker) =:= Node -> - loop_the_locker(S) - after 60000 -> - ?trace({nodeupnevercame, node(HisTheLocker)}), - error_logger:error_msg("global: nodeup never came ~w ~w\n", - [node(), node(HisTheLocker)]), - loop_the_locker(S#multi{just_synced = false}) - end; -the_locker_message({cancel, _Node, undefined, no_fun}, S) -> - ?trace({cancel_the_locker, undefined, {node,_Node}}), + Him = #him{node = node(HisTheLocker), my_tag = MyTag, + his_tag = HisTag, locker = HisTheLocker, vsn = HisVsn}, + loop_the_locker(add_node(Him, S)); +the_locker_message({cancel, _Node, undefined, no_fun} = _CMsg, S) -> + ?trace({cancel_the_locker, undefined, {node,_Node}, _CMsg}), %% If we actually cancel something when a cancel message with the %% tag 'undefined' arrives, we may be acting on an old nodedown, %% to cancel a new nodeup, so we can't do that. loop_the_locker(S); -the_locker_message({cancel, Node, Tag, no_fun}, S) -> +the_locker_message({cancel, Node, Tag, no_fun} = _CMsg, S) -> ?trace({the_locker, cancel, {multi,S}, {tag,Tag},{node,Node}}), - receive - {nodeup, Node, Tag} -> - ?trace({cancelnodeup2, {node,Node},{tag,Tag}}), - ok - after 0 -> - ok - end, - loop_the_locker(remove_node(Node, S)); -the_locker_message({lock_set, _Pid, false, _}, S) -> - ?trace({the_locker, spurious, {node,node(_Pid)}}), + loop_the_locker(remove_node(Node, Tag, S)); +the_locker_message({lock_set, _Pid, false, _, _} = _Msg, S) -> + ?trace({the_locker, spurious, {node,node(_Pid)}, _Msg}), + loop_the_locker(S); +the_locker_message({lock_set, _Pid, false, _} = _Msg, S) -> + ?trace({the_locker, spurious, {node,node(_Pid)}, _Msg}), loop_the_locker(S); -the_locker_message({lock_set, Pid, true, _HisKnown}, S) -> +the_locker_message({lock_set, Pid, true, _HisKnown, MyTag} = _Msg, S) -> Node = node(Pid), - ?trace({the_locker, self(), spontaneous, {node,Node}}), + ?trace({the_locker, self(), spontaneous, {node,Node}, _Msg}), case find_node_tag(Node, S) of - {true, MyTag, HisVsn} -> + {true, MyTag, HisVsn, HisTag} -> LockId = locker_lock_id(Pid, HisVsn), {IsLockSet, S1} = lock_nodes_safely(LockId, [], S), - send_lock_set(S1, IsLockSet, Pid, HisVsn), + send_lock_set(S1, IsLockSet, Pid, HisVsn, HisTag), Known2 = [node() | S1#multi.known], ?trace({the_locker, spontaneous, {known2, Known2}, {node,Node}, {is_lock_set,IsLockSet}}), @@ -1865,20 +2138,25 @@ the_locker_message({lock_set, Pid, true, _HisKnown}, S) -> %% Should the other locker's node die, %% global_name_server will receive nodedown, and %% then send {cancel, Node, Tag}. - receive - {cancel, Node, _Tag, Fun} -> - ?trace({cancel_the_lock,{node,Node}}), - call_fun(Fun), - delete_global_lock(LockId, Known2) - end, + wait_cancel_lock(Node, LockId, MyTag, Known2, + the_locker_message_wait_cancel, S1), S2 = S1#multi{just_synced = true}, - loop_the_locker(remove_node(Node, S2)); + loop_the_locker(remove_node(Node, MyTag, S2)); false -> loop_the_locker(S1#multi{just_synced = false}) end; false -> ?trace({the_locker, not_there, {node,Node}}), - send_lock_set(S, false, Pid, _HisVsn=5), + send_lock_set(S, false, Pid, _HisVsn=5, 0), + loop_the_locker(S) + end; +the_locker_message({lock_set, Pid, true, HisKnown}, S) -> + case find_node_tag(node(Pid), S) of + {true, MyTag, _HisVsn, _HisTag} -> + the_locker_message({lock_set, Pid, true, HisKnown, MyTag}, S); + false -> + ?trace({the_locker, not_there, {node,Node}}), + send_lock_set(S, false, Pid, _HisVsn=5, 0), loop_the_locker(S) end; the_locker_message({add_to_known, Nodes}, S) -> @@ -1889,11 +2167,23 @@ the_locker_message({remove_from_known, Node}, S) -> loop_the_locker(S1); the_locker_message({do_trace, DoTrace}, S) -> loop_the_locker(S#multi{do_trace = DoTrace}); +the_locker_message({get_state, _, _} = Msg, S) -> + get_state_reply(Msg, the_locker_message, S), + loop_the_locker(S); the_locker_message(Other, S) -> unexpected_message(Other, locker), ?trace({the_locker, {other_msg, Other}}), loop_the_locker(S). +get_state_reply({get_state, From, Ref}, Where, S) -> + %% Ref should always be in first element of the reply! + From ! {Ref, Where, S}, + ok. + +get_locker() -> + #state{the_locker = TheLocker} = info(), + TheLocker. + %% Requests from nodes on the local host are chosen before requests %% from other nodes. This should be a safe optimization. select_node(S) -> @@ -1910,7 +2200,7 @@ select_node(S) -> true -> Him = random_element(Others2), #him{locker = HisTheLocker, vsn = HisVsn, - node = Node, my_tag = MyTag} = Him, + node = Node, my_tag = MyTag, his_tag = HisTag} = Him, HisNode = [Node], Us = [node() | HisNode], LockId = locker_lock_id(HisTheLocker, HisVsn), @@ -1920,7 +2210,7 @@ select_node(S) -> case IsLockSet of true -> Known1 = Us ++ S2#multi.known, - send_lock_set(S2, true, HisTheLocker, HisVsn), + send_lock_set(S2, true, HisTheLocker, HisVsn, HisTag), S3 = lock_is_set(S2, Him, MyTag, Known1, LockId), loop_the_locker(S3); false -> @@ -1928,9 +2218,13 @@ select_node(S) -> end end. -send_lock_set(S, IsLockSet, HisTheLocker, Vsn) -> +send_lock_set(S, IsLockSet, HisTheLocker, Vsn, HisTag) -> ?trace({sending_lock_set, self(), {his,HisTheLocker}}), - Message = {lock_set, self(), IsLockSet, S#multi.known}, + Message = if Vsn < ?verify_connect_vsn -> + {lock_set, self(), IsLockSet, S#multi.known}; + true -> + {lock_set, self(), IsLockSet, S#multi.known, HisTag} + end, if Vsn < 6 -> HisTheLocker ! Message, @@ -2021,7 +2315,8 @@ update_locker_known(Upd, S) -> {remove, Node} -> lists:delete(Node, S#multi.known) end, TheBoss = the_boss([node() | Known]), - S#multi{known = Known, the_boss = TheBoss}. + NewS = S#multi{known = Known, the_boss = TheBoss}, + NewS. random_element(L) -> E = abs(erlang:monotonic_time() @@ -2034,39 +2329,34 @@ exclude_known(Others, Known) -> lock_is_set(S, Him, MyTag, Known1, LockId) -> Node = Him#him.node, receive + {lock_set, P, true, _, MyTag} when node(P) =:= Node -> + lock_is_set_true_received(S, Him, MyTag, Known1, LockId, P); + {lock_set, P, true, _, _OldMyTag} when node(P) =:= Node -> + lock_is_set(S, Him, MyTag, Known1, LockId); {lock_set, P, true, _} when node(P) =:= Node -> - gen_server:cast(global_name_server, - {lock_is_set, Node, MyTag, LockId}), - ?trace({lock_sync_done, {p,P, node(P)}, {me,self()}}), - - %% Wait for global to tell us to remove lock. Should the - %% other locker's node die, global_name_server will - %% receive nodedown, and then send {cancel, Node, Tag, Fun}. - receive - {cancel, Node, _, Fun} -> - ?trace({lock_set_loop, {known1,Known1}}), - call_fun(Fun), - delete_global_lock(LockId, Known1) - end, - S#multi{just_synced = true, - local = lists:delete(Him, S#multi.local), - remote = lists:delete(Him, S#multi.remote)}; - {lock_set, P, false, _} when node(P) =:= Node -> + lock_is_set_true_received(S, Him, MyTag, Known1, LockId, P); + {lock_set, P, false, _, MyTag} when node(P) =:= Node -> ?trace({not_both_set, {node,Node},{p, P},{known1,Known1}}), _ = locker_trace(S, rejected, Known1), delete_global_lock(LockId, Known1), S; - {cancel, Node, _, Fun} -> - ?trace({the_locker, cancel2, {node,Node}}), - call_fun(Fun), + {lock_set, P, false, _, _OldMyTag} when node(P) =:= Node -> + lock_is_set(S, Him, MyTag, Known1, LockId); + {lock_set, P, false, _} when node(P) =:= Node -> + ?trace({not_both_set, {node,Node},{p, P},{known1,Known1}}), _ = locker_trace(S, rejected, Known1), delete_global_lock(LockId, Known1), - remove_node(Node, S); - {'EXIT', _, _} -> - ?trace({the_locker, exit, {node,Node}}), + S; + {cancel, Node, MyTag, Fun} = _CMsg -> + ?trace({the_locker, cancel2, {node,Node}, _CMsg}), + call_fun(Fun), _ = locker_trace(S, rejected, Known1), delete_global_lock(LockId, Known1), - S + remove_node(Node, MyTag, S); + {get_state, _, _} = Msg -> + get_state_reply(Msg, lock_is_set, S), + lock_is_set(S, Him, MyTag, Known1, LockId) + %% There used to be an 'after' clause (OTP-4902), but it is %% no longer needed: %% OTP-5770. Version 5 of the protocol. Deadlock can no longer @@ -2077,6 +2367,35 @@ lock_is_set(S, Him, MyTag, Known1, LockId) -> %% with the first partition. end. +lock_is_set_true_received(S, Him, MyTag, Known1, LockId, P) -> + Node = node(P), + gen_server:cast(global_name_server, + {lock_is_set, Node, MyTag, LockId}), + ?trace({lock_sync_done, {p,P, node(P)}, {me,self()}}), + + %% Wait for global to tell us to remove lock. Should the + %% other locker's node die, global_name_server will + %% receive nodedown, and then send {cancel, Node, Tag, Fun}. + wait_cancel_lock(Node, LockId, MyTag, Known1, + lock_is_set_wait_cancel, S), + S#multi{just_synced = true, + local = lists:delete(Him, S#multi.local), + remote = lists:delete(Him, S#multi.remote)}. + +wait_cancel_lock(Node, LockId, MyTag, Known, Where, S) -> + receive + {cancel, Node, MyTag, Fun} = _CMsg -> + ?trace({cancel_the_lock, {where, _Where}, + {node,Node}, {known, Known}, _CMsg}), + call_fun(Fun), + delete_global_lock(LockId, Known); + + {get_state, _, _} = Msg -> + get_state_reply(Msg, Where, S), + wait_cancel_lock(Node, LockId, MyTag, Known, Where, S) + end. + + %% The locker does the {new_nodes, ...} call before removing the lock. call_fun(no_fun) -> ok; @@ -2104,21 +2423,21 @@ find_node_tag(Node, S) -> find_node_tag2(_Node, []) -> false; -find_node_tag2(Node, [#him{node = Node, my_tag = MyTag, vsn = HisVsn} | _]) -> - {true, MyTag, HisVsn}; +find_node_tag2(Node, [#him{node = Node, my_tag = MyTag, vsn = HisVsn, his_tag = HisTag} | _]) -> + {true, MyTag, HisVsn, HisTag}; find_node_tag2(Node, [_E | Rest]) -> find_node_tag2(Node, Rest). -remove_node(Node, S) -> - S#multi{local = remove_node2(Node, S#multi.local), - remote = remove_node2(Node, S#multi.remote)}. +remove_node(Node, Tag, S) -> + S#multi{local = remove_node2(Node, Tag, S#multi.local), + remote = remove_node2(Node, Tag, S#multi.remote)}. -remove_node2(_Node, []) -> +remove_node2(_Node, _Tag, []) -> []; -remove_node2(Node, [#him{node = Node} | Rest]) -> +remove_node2(Node, Tag, [#him{node = Node, my_tag = Tag} | Rest]) -> Rest; -remove_node2(Node, [E | Rest]) -> - [E | remove_node2(Node, Rest)]. +remove_node2(Node, Tag, [E | Rest]) -> + [E | remove_node2(Node, Tag, Rest)]. add_node(Him, S) -> case is_node_local(Him#him.node) of @@ -2145,10 +2464,12 @@ cancel_locker(Node, S, Tag) -> cancel_locker(Node, S, Tag, no_fun). cancel_locker(Node, S, Tag, ToBeRunOnLockerF) -> - S#state.the_locker ! {cancel, Node, Tag, ToBeRunOnLockerF}, + CMsg = {cancel, Node, Tag, ToBeRunOnLockerF}, + S#state.the_locker ! CMsg, Resolvers = S#state.resolvers, ?trace({cancel_locker, {node,Node},{tag,Tag}, {sync_tag_my, get({sync_tag_my, Node})},{resolvers,Resolvers}}), + send_cancel_connect(Node, Tag, S), case lists:keyfind(Node, 1, Resolvers) of {_, Tag, Resolver} -> ?trace({{resolver, Resolver}}), @@ -2159,6 +2480,34 @@ cancel_locker(Node, S, Tag, ToBeRunOnLockerF) -> S end. +send_cancel_connect(Node, MyTag, #state{known = Known}) -> + %% Send a cancel_connect message if we got an ongoing + %% connect... + try + case maps:find({pending, Node}, Known) of + {ok, Vsn} when Vsn < ?verify_connect_vsn -> + throw(ignore); + error -> + throw(ignore); + {ok, _Vsn} -> + ok + end, + case get({sync_tag_my, Node}) of + MyTag -> ok; + _ -> throw(ignore) + end, + send_cancel_connect_message(Node, get({sync_tag_his, Node})) + catch + throw:ignore -> + ok + end. + +send_cancel_connect_message(Node, HisTag) -> + Msg = {cancel_connect, node(), HisTag}, + To = {global_name_server, Node}, + _ = erlang:send(To, Msg, [noconnect]), + ok. + reset_node_state(Node) -> ?trace({{node,Node}, reset_node_state, get()}), erase({wait_lock, Node}), @@ -2193,15 +2542,15 @@ exchange_names([{Name, Pid, Method} | Tail], Node, Ops, Res) -> Op = {delete, Name}, exchange_names(Tail, Node, [Op | Ops], [Op | Res]); {badrpc, Badrpc} -> - error_logger:info_msg("global: badrpc ~w received when " - "conflicting name ~tw was found\n", - [Badrpc, Name]), + logger:log(info, "global: badrpc ~w received when " + "conflicting name ~tw was found\n", + [Badrpc, Name]), Op = {insert, {Name, Pid, Method}}, exchange_names(Tail, Node, [Op | Ops], Res); Else -> - error_logger:info_msg("global: Resolve method ~w for " - "conflicting name ~tw returned ~tw\n", - [Method, Name, Else]), + logger:log(info, "global: Resolve method ~w for " + "conflicting name ~tw returned ~tw\n", + [Method, Name, Else]), Op = {delete, Name}, exchange_names(Tail, Node, [Op | Ops], [Op | Res]) end; @@ -2229,8 +2578,8 @@ minmax(P1,P2) -> Pid2 :: pid(). random_exit_name(Name, Pid, Pid2) -> {Min, Max} = minmax(Pid, Pid2), - error_logger:info_msg("global: Name conflict terminating ~tw\n", - [{Name, Max}]), + logger:log(info, "global: Name conflict terminating ~tw\n", + [{Name, Max}]), exit(Max, kill), Min. @@ -2296,6 +2645,49 @@ pid_locks(Ref) -> ref_is_locking(Ref, PidRefs) -> lists:keyfind(Ref, 2, PidRefs) =/= false. +handle_nodeup(Node, #state{the_locker = TheLocker, + resolvers = Rs, + known = Known} = S0) -> + case maps:is_key(Node, Known) orelse lists:keymember(Node, 1, Rs) of + true -> + %% Already known or in progress... + S0; + + false -> + %% Make ourselves known to Node... + + %% + %% In case a global group is configured, we might already + %% have received an init_connect message from Node (the + %% group_nodeup message may be delivered after early traffic + %% over the channel have been delivered). If we already have + %% gotten an init_connect, resend it to ourselves... + %% + resend_pre_connect(Node), + + %% erlang:unique_integer([monotonic]) is used as a tag to + %% separate different synch sessions + %% from each others. Global could be confused at bursty nodeups + %% because it couldn't separate the messages between the different + %% synch sessions started by a nodeup. + MyTag = erlang:unique_integer([monotonic]), + put({sync_tag_my, Node}, MyTag), + + %% In order to be compatible with unpatched R7 a locker + %% process was spawned. Vsn 5 is no longer compatible with + %% vsn 3 nodes, so the locker process is no longer needed. + %% The permanent locker takes its place. + NotAPid = no_longer_a_pid, + Locker = {locker, NotAPid, Known, TheLocker}, + InitC = {init_connect, {?vsn, MyTag}, node(), Locker}, + ?trace({casting_init_connect, {node,Node},{initmessage,InitC}, + {resolvers,Rs}}), + gen_server:cast({global_name_server, Node}, InitC), + Resolver = start_resolver(Node, MyTag), + S1 = trace_message(S0, {new_resolver, Node}, [MyTag, Resolver]), + S1#state{resolvers = [{Node, MyTag, Resolver} | Rs]} + end. + handle_nodedown(Node, #state{synced = Syncs, known = Known0} = S, What) -> %% DOWN signals from monitors have removed locks and registered names. @@ -2316,9 +2708,9 @@ handle_nodedown(Node, #state{synced = Syncs, Known0 end end, - NewS#state{known = maps:remove(Node, - maps:remove({pending, Node}, Known1)), - synced = lists:delete(Node, Syncs)}. + Known2 = maps:remove({pending, Node}, Known1), + Known3 = maps:remove(Node, Known2), + NewS#state{known = Known3, synced = lists:delete(Node, Syncs)}. inform_connection_loss(Node, #state{conf = #conf{connect_all = true, @@ -2571,7 +2963,7 @@ check_sync_nodes() -> {ok, NodesNG} -> %% global_groups parameter is defined, we are not allowed to sync %% with nodes not in our own global group. - intersection(nodes(), NodesNG); + intersection(NodesNG, nodes()); {error, _} = Error -> Error end. @@ -2583,7 +2975,7 @@ check_sync_nodes(SyncNodes) -> {ok, NodesNG} -> %% global_groups parameter is defined, we are not allowed to sync %% with nodes not in our own global group. - OwnNodeGroup = intersection(nodes(), NodesNG), + OwnNodeGroup = intersection(NodesNG, nodes()), IllegalSyncNodes = (SyncNodes -- [node() | OwnNodeGroup]), case IllegalSyncNodes of [] -> SyncNodes; @@ -2624,9 +3016,9 @@ unexpected_message({'EXIT', _Pid, _Reason}, _What) -> %% global_name_server died ok; unexpected_message(Message, What) -> - error_logger:warning_msg("The global_name_server ~w process " - "received an unexpected message:\n~tp\n", - [What, Message]). + logger:log(warning, "The global_name_server ~w process " + "received an unexpected message:\n~tp\n", + [What, Message]). %%% Utilities diff --git a/lib/kernel/src/global_group.erl b/lib/kernel/src/global_group.erl index fc90291ee9..69f3f367eb 100644 --- a/lib/kernel/src/global_group.erl +++ b/lib/kernel/src/global_group.erl @@ -36,26 +36,29 @@ -export([send/3]). -export([whereis_name/1]). -export([whereis_name/2]). --export([global_groups_changed/1]). --export([global_groups_added/1]). --export([global_groups_removed/1]). -export([sync/0]). --export([ng_add_check/2, ng_add_check/3]). - -export([info/0]). --export([registered_names_test/1]). --export([send_test/2]). --export([whereis_name_test/1]). --export([get_own_nodes/0, get_own_nodes_with_errors/0]). --export([publish_on_nodes/0]). --export([config_scan/1, config_scan/2]). +%% Kernel application internal exports -%% Internal exports --export([sync_init/4]). +-export([global_groups_changed/1, + global_groups_added/1, + global_groups_removed/1, + get_own_nodes/0, + get_own_nodes_with_errors/0, + member/1, + participant/1, + publish/2, + group_configured/0]). +%% Internal exports +-export([ng_add_check/2, + ng_add_check/3, + registered_names_test/1, + send_test/2, + whereis_name_test/1]). --define(cc_vsn, 2). +-define(cc_vsn, 3). %%%==================================================================================== @@ -67,7 +70,7 @@ | {GroupName :: group_name(), PublishType :: publish_type(), [node()]}. - +-type node_state() :: 'sync' | 'sync_error' | 'no_contact'. %%%==================================================================================== %%% The state of the global_group process %%% @@ -78,22 +81,45 @@ %%% no_contact = Nodes which we haven't had contact with yet %%% sync_error = Nodes which we haven't had contact with yet %%% other_grps = list of other global group names and nodes, [{otherName, [Node]}] -%%% node_name = Own node %%% monitor = List of Pids requesting nodeup/nodedown %%%==================================================================================== -record(state, {sync_state = no_conf :: sync_state(), - connect_all :: boolean(), group_name = [] :: group_name() | [], - nodes = [] :: [node()], - no_contact = [] :: [node()], - sync_error = [], + nodes = #{} :: #{node() => node_state()}, other_grps = [], - node_name = node() :: node(), monitor = [], - publish_type = normal :: publish_type(), - group_publish_type = normal :: publish_type()}). - + group_publish_type = normal :: publish_type(), + connections :: #{node() => non_neg_integer()}, + rpc_requests :: #{reference() => term()}, + config_check :: 'undefined' + | {reference(), + #{node() => reference()}}}). + +%%%==================================================================================== +%%% Configuration record. Returned by: +%%% * fetch_new_group_conf/1,2 -- Fetch and install new configuration +%%% * new_group_conf/2,3 -- Install new configuration +%%% * lookup_group_conf/1 -- Lookup installed configuration (will fetch +%%% and install configuration if it has not +%%% been initialized) +%%% * alive_state_change_group_conf/1 -- Adjust configuration according to alive +%%% state +%%%==================================================================================== +-record(gconf, {parameter_value = invalid :: [group_tuple()] + | undefined + | invalid, + node_name :: node() | 'undefined', + group_name = [] :: group_name() | [], + group_publish_type = normal :: publish_type(), + group_list = [] :: [node()], + group_map = all :: 'all' | #{ node() => ok }, + other_groups = [] :: [group_tuple()], + state = no_conf :: 'no_conf' + | 'conf' + | {error, + term(), + [group_tuple()]}}). %%%==================================================================================== %%% External exported @@ -165,12 +191,6 @@ global_groups_removed(NewPara) -> sync() -> request(sync). -ng_add_check(Node, OthersNG) -> - ng_add_check(Node, normal, OthersNG). - -ng_add_check(Node, PubType, OthersNG) -> - request({ng_add_check, Node, PubType, OthersNG}). - -type info_item() :: {'state', State :: sync_state()} | {'own_group_name', GroupName :: group_name()} | {'own_group_nodes', Nodes :: [node()]} @@ -184,6 +204,14 @@ ng_add_check(Node, PubType, OthersNG) -> info() -> request(info, 3000). +%% global_group internal... + +ng_add_check(Node, OthersNG) -> + ng_add_check(Node, normal, OthersNG). + +ng_add_check(Node, PubType, OthersNG) -> + request({ng_add_check, Node, PubType, OthersNG}). + %% ==== ONLY for test suites ==== registered_names_test(Arg) -> request({registered_names_test, Arg}). @@ -214,8 +242,8 @@ request(Req, Time) -> %%% Otherwise a sync process is started to check that all nodes in the own global %%% group have the same configuration. This is done by sending 'conf_check' to all %%% other nodes and requiring 'conf_check_result' back. -%%% If the nodes are not in agreement of the configuration the global_group process -%%% will remove these nodes from the #state.nodes list. This can be a normal case +%%% If the nodes are not in agreement of the configuration the global_group process, +%%% these nodes will be set in a state different than 'sync'. This can be a normal case %%% at release upgrade when all nodes are not yet upgraded. %%% %%% It is possible to manually force a sync of the global_group. This is done for @@ -233,49 +261,65 @@ stop() -> gen_server:call(global_group, stop, infinity). init([]) -> process_flag(priority, max), - ok = net_kernel:monitor_nodes(true), + ok = net_kernel:monitor_nodes(true, #{connection_id => true}), put(registered_names, [undefined]), put(send, [undefined]), put(whereis_name, [undefined]), process_flag(trap_exit, true), - Ca = case init:get_argument(connect_all) of - {ok, [["false"]]} -> - false; - _ -> - true - end, - PT = publish_arg(), - case application:get_env(kernel, global_groups) of - undefined -> - update_publish_nodes(PT), - {ok, #state{publish_type = PT, - connect_all = Ca}}; - {ok, []} -> - update_publish_nodes(PT), - {ok, #state{publish_type = PT, - connect_all = Ca}}; - {ok, NodeGrps} -> - {DefGroupName, PubTpGrp, DefNodes, DefOther} = - case catch config_scan(NodeGrps, publish_type) of - {error, _Error2} -> - update_publish_nodes(PT), - exit({error, {'invalid global_groups definition', NodeGrps}}); - {DefGroupNameT, PubType, DefNodesT, DefOtherT} -> - update_publish_nodes(PT, {PubType, DefNodesT}), - %% First disconnect any nodes not belonging to our own group - disconnect_nodes(nodes(connected) -- DefNodesT), - lists:foreach(fun(Node) -> - erlang:monitor_node(Node, true) - end, - DefNodesT), - {DefGroupNameT, PubType, lists:delete(node(), DefNodesT), DefOtherT} - end, - {ok, #state{publish_type = PT, group_publish_type = PubTpGrp, - sync_state = synced, group_name = DefGroupName, - no_contact = lists:sort(DefNodes), - other_grps = DefOther, connect_all = Ca}} - end. + GGC = spawn_link(fun global_group_check_dispatcher/0), + register(global_group_check, GGC), + put(global_group_check, GGC), + + %% There are most likely no connected nodes at this stage, + %% but check to make sure... + Conns = lists:foldl(fun ({N, #{connection_id := CId}}, Cs) -> + Cs#{N => CId} + end, + #{}, + nodes(visible, #{connection_id => true})), + S = initial_group_setup(fetch_new_group_conf(true, node()), Conns, #{}), + {ok, S}. + +initial_group_setup(#gconf{state = no_conf}, Conns, Reqs) -> + #state{connections = Conns, + rpc_requests = Reqs}; +initial_group_setup(#gconf{state = {error, _Err, NodeGrps}}, + _Conns, _Reqs) -> + exit({error, {'invalid global_groups definition', NodeGrps}}); +initial_group_setup(#gconf{node_name = NodeName, + group_name = DefGroupName, + group_list = DefNodesT, + group_publish_type = PubTpGrp, + other_groups = DefOther}, Conns, Reqs) -> + + DefNodes = lists:delete(NodeName, DefNodesT), + ConnectedNodes = maps:keys(Conns), + DisconnectNodes = ConnectedNodes -- DefNodes, + NotConnectedOwnNodes = DefNodes -- ConnectedNodes, + ConnectedOwnNodes = DefNodes -- NotConnectedOwnNodes, + %% First disconnect any nodes not belonging to our own group + disconnect_nodes(DisconnectNodes, Conns), + + %% Schedule group consistency check for all nodes. The response + %% is later handled in handle_rpc_response(). + NewReqs = schedule_conf_changed_checks(DefNodes, Reqs, Conns), + + %% Set connected nodes in sync_error state since, we + %% have not yet verified their configuration... + Nodes0 = lists:foldl(fun (Node, Acc) -> + Acc#{Node => sync_error} + end, + #{}, ConnectedOwnNodes), + Nodes = lists:foldl(fun (Node, Acc) -> + Acc#{Node => no_contact} + end, + Nodes0, NotConnectedOwnNodes), + + #state{group_publish_type = PubTpGrp, + sync_state = synced, group_name = DefGroupName, + nodes = Nodes, other_grps = DefOther, + connections = Conns, rpc_requests = NewReqs}. %%%==================================================================================== %%% sync() -> ok @@ -284,37 +328,51 @@ init([]) -> %%% a release upgrade. It can also be ordered if somthing has made the nodes %%% to disagree of the global_groups definition. %%%==================================================================================== -handle_call(sync, _From, S) -> +handle_call(sync, _From, #state{nodes = OldNodes, + connections = Conns} = S) -> % io:format("~p sync ~p~n",[node(), application:get_env(kernel, global_groups)]), - case application:get_env(kernel, global_groups) of - undefined -> - update_publish_nodes(S#state.publish_type), - {reply, ok, S}; - {ok, []} -> - update_publish_nodes(S#state.publish_type), - {reply, ok, S}; - {ok, NodeGrps} -> - {DefGroupName, PubTpGrp, DefNodes, DefOther} = - case catch config_scan(NodeGrps, publish_type) of - {error, _Error2} -> - exit({error, {'invalid global_groups definition', NodeGrps}}); - {DefGroupNameT, PubType, DefNodesT, DefOtherT} -> - update_publish_nodes(S#state.publish_type, {PubType, DefNodesT}), - %% First inform global on all nodes not belonging to our own group - disconnect_nodes(nodes(connected) -- DefNodesT), - %% Sync with the nodes in the own group - kill_global_group_check(), - Pid = spawn_link(?MODULE, sync_init, - [sync, DefGroupNameT, PubType, DefNodesT]), - register(global_group_check, Pid), - {DefGroupNameT, PubType, lists:delete(node(), DefNodesT), DefOtherT} - end, - {reply, ok, S#state{sync_state = synced, group_name = DefGroupName, - no_contact = lists:sort(DefNodes), - other_grps = DefOther, group_publish_type = PubTpGrp}} - end; - + case lookup_group_conf(true) of + #gconf{state = no_conf, + group_name = DefGroupName, + group_list = _DefNodesT, + group_publish_type = PubTpGrp, + other_groups = DefOther} -> + {reply, ok, S#state{sync_state = no_conf, + group_name = DefGroupName, + nodes = #{}, + group_publish_type = PubTpGrp, + other_grps = DefOther}}; + + #gconf{state = {error, _Err, NodeGrps}} -> + exit({error, {'invalid global_groups definition', NodeGrps}}); + + #gconf{group_name = DefGroupName, + group_list = DefNodesT, + group_publish_type = PubTpGrp, + other_groups = DefOther} -> + DefNodes = lists:delete(node(), DefNodesT), + %% First inform global on all nodes not belonging to our own group + disconnect_nodes(nodes() -- DefNodes, Conns), + %% Sync with the nodes in the own group + + SyncSession = make_ref(), + + CCMsg = {conf_check, ?cc_vsn, node(), {self(), SyncSession}, + sync, DefGroupName, PubTpGrp, DefNodesT}, + {NewNodes, Mons} + = lists:foldl(fun (N, {Nacc, Macc}) -> + GG = {global_group, N}, + M = erlang:monitor(process, GG), + gen_server:cast(GG, CCMsg), + NS = maps:get(N, OldNodes, no_contact), + {Nacc#{N => NS}, Macc#{N => M}} + end, {#{}, #{}}, DefNodes), + {reply, ok, S#state{sync_state = synced, group_name = DefGroupName, + nodes = NewNodes, other_grps = DefOther, + group_publish_type = PubTpGrp, + config_check = {SyncSession, Mons}}} + end; %%%==================================================================================== %%% global_groups() -> {OwnGroupName, [OtherGroupName]} | undefined @@ -347,7 +405,6 @@ handle_call({monitor_nodes, Flag}, {Pid, _}, StateIn) -> {Res, State} = monitor_nodes(Flag, Pid, StateIn), {reply, Res, State}; - %%%==================================================================================== %%% own_nodes() -> [Node] %%% @@ -358,8 +415,7 @@ handle_call(own_nodes, _From, S) -> no_conf -> [node() | nodes()]; synced -> - get_own_nodes() -% S#state.nodes + get_own_nodes(true) end, {reply, Nodes, S}; @@ -503,22 +559,27 @@ handle_call({whereis_name, {node, Node}, Name}, From, S) -> %%% The node is not resynced automatically because it would cause this node to %%% be disconnected from those nodes not yet been upgraded. %%%==================================================================================== -handle_call({global_groups_changed, NewPara}, _From, S) -> - {NewGroupName, PubTpGrp, NewNodes, NewOther} = - case catch config_scan(NewPara, publish_type) of - {error, _Error2} -> - exit({error, {'invalid global_groups definition', NewPara}}); - {DefGroupName, PubType, DefNodes, DefOther} -> - update_publish_nodes(S#state.publish_type, {PubType, DefNodes}), - {DefGroupName, PubType, DefNodes, DefOther} - end, - - %% #state.nodes is the common denominator of previous and new definition - NN = NewNodes -- (NewNodes -- S#state.nodes), - %% rest of the nodes in the new definition are marked as not yet contacted - NNC = (NewNodes -- S#state.nodes) -- S#state.sync_error, - %% remove sync_error nodes not belonging to the new group - NSE = NewNodes -- (NewNodes -- S#state.sync_error), +handle_call({global_groups_changed, NewPara}, _From, + #state{rpc_requests = Reqs, + nodes = OldNodes, + connections = Conns} = S) -> + + #gconf{group_name = NewGroupName, + group_publish_type = PubTpGrp, + group_list = NewNodesListT, + other_groups = NewOther, + state = GState} = new_group_conf(true, NewPara), + + case GState of + no_conf -> + exit({error, 'no global_groups definiton'}); + {error, _Err, NodeGrps} -> + exit({error, {'invalid global_groups definition', NodeGrps}}); + _ -> + ok + end, + + NewNodesList = lists:delete(node(), NewNodesListT), %% Disconnect the connection to nodes which are not in our old global group. %% This is done because if we already are aware of new nodes (to our global @@ -528,59 +589,78 @@ handle_call({global_groups_changed, NewPara}, _From, S) -> %% manually force a sync of the nodes after all nodes beeing uppgraded. %% We must disconnect also if some nodes to which we have a connection %% will not be in any global group at all. - force_nodedown(nodes(connected) -- NewNodes), + force_nodedown(nodes(connected) -- NewNodesList, Conns), - NewS = S#state{group_name = NewGroupName, - nodes = lists:sort(NN), - no_contact = lists:sort(lists:delete(node(), NNC)), - sync_error = lists:sort(NSE), + NewNodes = lists:foldl(fun (N, Nacc) -> + NS = maps:get(N, OldNodes, no_contact), + Nacc#{N => NS} + end, #{}, NewNodesList), + + %% Schedule group consistency check due to config change. The response is + %% later handled in handle_rpc_response()... + NewReqs = schedule_conf_changed_checks(NewNodesList, Reqs, Conns), + + NewS = S#state{group_name = NewGroupName, + nodes = NewNodes, other_grps = NewOther, - group_publish_type = PubTpGrp}, + group_publish_type = PubTpGrp, + rpc_requests = NewReqs, + config_check = undefined}, {reply, ok, NewS}; - %%%==================================================================================== %%% global_groups parameter added %%% The node is not resynced automatically because it would cause this node to %%% be disconnected from those nodes not yet been upgraded. %%%==================================================================================== -handle_call({global_groups_added, NewPara}, _From, S) -> +handle_call({global_groups_added, NewPara}, _From, + #state{connections = Conns, + rpc_requests = Reqs} = S) -> % io:format("### global_groups_changed, NewPara ~p ~n",[NewPara]), - {NewGroupName, PubTpGrp, NewNodes, NewOther} = - case catch config_scan(NewPara, publish_type) of - {error, _Error2} -> - exit({error, {'invalid global_groups definition', NewPara}}); - {DefGroupName, PubType, DefNodes, DefOther} -> - update_publish_nodes(S#state.publish_type, {PubType, DefNodes}), - {DefGroupName, PubType, DefNodes, DefOther} - end, + + #gconf{group_name = NewGroupName, + group_publish_type = PubTpGrp, + group_list = NewNodesList, + other_groups = NewOther, + state = GState} = new_group_conf(true, NewPara), + + case GState of + no_conf -> + exit({error, 'no global_groups definiton'}); + {error, _Err, NodeGrps} -> + exit({error, {'invalid global_groups definition', NodeGrps}}); + _ -> + ok + end, %% disconnect from those nodes which are not going to be in our global group - force_nodedown(nodes(connected) -- NewNodes), + force_nodedown(nodes(connected) -- NewNodesList, Conns), %% Check which nodes are already updated - OwnNG = get_own_nodes(), - NGACArgs = case S#state.group_publish_type of - normal -> - [node(), OwnNG]; - _ -> - [node(), S#state.group_publish_type, OwnNG] - end, - {NN, NNC, NSE} = - lists:foldl(fun(Node, {NN_acc, NNC_acc, NSE_acc}) -> - case rpc:call(Node, global_group, ng_add_check, NGACArgs) of - {badrpc, _} -> - {NN_acc, [Node | NNC_acc], NSE_acc}; - agreed -> - {[Node | NN_acc], NNC_acc, NSE_acc}; - not_agreed -> - {NN_acc, NNC_acc, [Node | NSE_acc]} - end - end, - {[], [], []}, lists:delete(node(), NewNodes)), - NewS = S#state{sync_state = synced, group_name = NewGroupName, nodes = lists:sort(NN), - sync_error = lists:sort(NSE), no_contact = lists:sort(NNC), - other_grps = NewOther, group_publish_type = PubTpGrp}, + NGACArgs = [node(), PubTpGrp, NewNodesList], + + %% Schedule ng_add_check on all nodes in our configuration and build up + %% the nodes map. The responses to the ng_add_check are later handled in + %% handle_rpc_response(). + {NewReqs, NewNodes} + = lists:foldl( + fun (N, {Racc, Nacc}) -> + CId = maps:get(N, Conns, not_connected), + NRacc = rpc_send_request(N, ?MODULE, ng_add_check, + NGACArgs, + {ng_add_check, N, CId}, + Racc), + What = if CId == not_connected -> no_contact; + true -> sync_error + end, + {NRacc, Nacc#{N => What}} + end, + {Reqs, #{}}, lists:delete(node(), NewNodesList)), + + NewS = S#state{sync_state = synced, group_name = NewGroupName, + nodes = NewNodes, rpc_requests = NewReqs, + other_grps = NewOther, group_publish_type = PubTpGrp, + config_check = undefined}, {reply, ok, NewS}; @@ -589,10 +669,16 @@ handle_call({global_groups_added, NewPara}, _From, S) -> %%%==================================================================================== handle_call({global_groups_removed, _NewPara}, _From, S) -> % io:format("### global_groups_removed, NewPara ~p ~n",[_NewPara]), - update_publish_nodes(S#state.publish_type), - NewS = S#state{sync_state = no_conf, group_name = [], nodes = [], - sync_error = [], no_contact = [], - other_grps = []}, + + #gconf{group_name = NewGroupName, + group_publish_type = PubTpGrp, + group_list = _NewNodes, + other_groups = NewOther, + state = no_conf} = new_group_conf(true, undefined), + + NewS = S#state{sync_state = no_conf, group_name = NewGroupName, nodes = #{}, + other_grps = NewOther, group_publish_type = PubTpGrp, + config_check = undefined}, {reply, ok, NewS}; @@ -601,40 +687,37 @@ handle_call({global_groups_removed, _NewPara}, _From, S) -> %%% belong to the same global group. %%% It could happen that our node is not yet updated with the new node_group parameter %%%==================================================================================== -handle_call({ng_add_check, Node, PubType, OthersNG}, _From, S) -> +handle_call({ng_add_check, Node, PubType, OthersNG}, _From, + #state{group_publish_type = OwnPubType} = S) -> %% Check which nodes are already updated - OwnNG = get_own_nodes(), - case S#state.group_publish_type =:= PubType of - true -> - case OwnNG of - OthersNG -> - NN = [Node | S#state.nodes], - NSE = lists:delete(Node, S#state.sync_error), - NNC = lists:delete(Node, S#state.no_contact), - NewS = S#state{nodes = lists:sort(NN), - sync_error = NSE, - no_contact = NNC}, - {reply, agreed, NewS}; - _ -> - {reply, not_agreed, S} - end; - _ -> - {reply, not_agreed, S} + OwnNodes = get_own_nodes(true), + case {PubType, lists:sort(OthersNG)} of + {OwnPubType, OwnNodes} -> + {reply, agreed, node_state(sync, Node, S)}; + _ -> + {reply, not_agreed, node_state(sync_error, Node, S)} end; - - %%%==================================================================================== %%% Misceleaneous help function to read some variables %%%==================================================================================== -handle_call(info, _From, S) -> +handle_call(info, _From, S) -> + {InSync, SyncError, NoContact} + = maps:fold(fun (N, sync, {ISacc, SEacc, NCacc}) -> + {[N|ISacc], SEacc, NCacc}; + (N, sync_error, {ISacc, SEacc, NCacc}) -> + {ISacc, [N|SEacc], NCacc}; + (N, no_contact, {ISacc, SEacc, NCacc}) -> + {ISacc, SEacc, [N|NCacc]} + end, + {[],[],[]}, + S#state.nodes), Reply = [{state, S#state.sync_state}, {own_group_name, S#state.group_name}, - {own_group_nodes, get_own_nodes()}, -% {"nodes()", lists:sort(nodes())}, - {synced_nodes, S#state.nodes}, - {sync_error, S#state.sync_error}, - {no_contact, S#state.no_contact}, + {own_group_nodes, get_own_nodes(true)}, + {synced_nodes, lists:sort(InSync)}, + {sync_error, lists:sort(SyncError)}, + {no_contact, lists:sort(NoContact)}, {other_groups, S#state.other_grps}, {monitoring, S#state.monitor}], @@ -755,221 +838,270 @@ handle_cast({find_name_res, Result, Pid, From}, S) -> gen_server:reply(From, Result), {noreply, S}; - -%%%==================================================================================== -%%% The node is synced successfully -%%%==================================================================================== -handle_cast({synced, NoContact}, S) -> -% io:format("~p>>>>> synced ~p ~n",[node(), NoContact]), - kill_global_group_check(), - Nodes = get_own_nodes() -- [node() | NoContact], - {noreply, S#state{nodes = lists:sort(Nodes), - sync_error = [], - no_contact = NoContact}}; - - -%%%==================================================================================== -%%% The node could not sync with some other nodes. -%%%==================================================================================== -handle_cast({sync_error, NoContact, ErrorNodes}, S) -> -% io:format("~p>>>>> sync_error ~p ~p ~n",[node(), NoContact, ErrorNodes]), - Txt = io_lib:format("Global group: Could not synchronize with these nodes ~p~n" - "because global_groups were not in agreement. ~n", [ErrorNodes]), - error_logger:error_report(Txt), - kill_global_group_check(), - Nodes = (get_own_nodes() -- [node() | NoContact]) -- ErrorNodes, - {noreply, S#state{nodes = lists:sort(Nodes), - sync_error = ErrorNodes, - no_contact = NoContact}}; - - %%%==================================================================================== %%% Another node is checking this node's group configuration %%%==================================================================================== handle_cast({conf_check, Vsn, Node, From, sync, CCName, CCNodes}, S) -> handle_cast({conf_check, Vsn, Node, From, sync, CCName, normal, CCNodes}, S); -handle_cast({conf_check, Vsn, Node, From, sync, CCName, PubType, CCNodes}, S) -> - CurNodes = S#state.nodes, +handle_cast({conf_check, Vsn, Node, From, sync, CCName, PubType, CCNodes}, + #state{connections = Conns} = S) -> % io:format(">>>>> conf_check,sync Node ~p~n",[Node]), %% Another node is syncing, %% done for instance after upgrade of global_groups parameter - NS = - case application:get_env(kernel, global_groups) of - undefined -> - %% We didn't have any node_group definition - update_publish_nodes(S#state.publish_type), - disconnect_nodes([Node]), - {global_group_check, Node} ! {config_error, Vsn, From, node()}, - S; - {ok, []} -> - %% Our node_group definition was empty - update_publish_nodes(S#state.publish_type), - disconnect_nodes([Node]), - {global_group_check, Node} ! {config_error, Vsn, From, node()}, - S; - %%--------------------------------- - %% global_groups defined - %%--------------------------------- - {ok, NodeGrps} -> - case catch config_scan(NodeGrps, publish_type) of - {error, _Error2} -> - %% Our node_group definition was erroneous - disconnect_nodes([Node]), - {global_group_check, Node} ! {config_error, Vsn, From, node()}, - S#state{nodes = lists:delete(Node, CurNodes)}; - - {CCName, PubType, CCNodes, _OtherDef} -> - %% OK, add the node to the #state.nodes if it isn't there - update_publish_nodes(S#state.publish_type, {PubType, CCNodes}), - global_name_server ! {nodeup, Node}, - {global_group_check, Node} ! {config_ok, Vsn, From, node()}, - case lists:member(Node, CurNodes) of - false -> - NewNodes = lists:sort([Node | CurNodes]), - NSE = lists:delete(Node, S#state.sync_error), - NNC = lists:delete(Node, S#state.no_contact), - S#state{nodes = NewNodes, - sync_error = NSE, - no_contact = NNC}; - true -> - S - end; - _ -> - %% node_group definitions were not in agreement - disconnect_nodes([Node]), - {global_group_check, Node} ! {config_error, Vsn, From, node()}, - NN = lists:delete(Node, S#state.nodes), - NSE = lists:delete(Node, S#state.sync_error), - NNC = lists:delete(Node, S#state.no_contact), - S#state{nodes = NN, - sync_error = NSE, - no_contact = NNC} - end - end, - {noreply, NS}; - + try + CId = case maps:get(Node, Conns, undefined) of + undefined -> + %% We got garbage from someone... + throw({noreply, S}); + CId0 -> + CId0 + end, + To = if is_integer(Vsn) andalso Vsn >= 3 -> + case From of + {Pid, _Session} when is_pid(Pid) -> + %% New node that does not use the + %% global_group_check process... + Pid; + _Garbage -> + %% We got garbage from someone... + throw({noreply, S}) + end; + true -> + %% Old node that still use the + %% global_group_check process... + {global_group_check, Node} + end, + case lookup_group_conf(true) of + #gconf{state = no_conf} -> + %% We didn't have any node_group definition + disconnect_nodes([Node], Conns), + To ! {config_error, Vsn, From, node()}, + {noreply, S}; + + #gconf{state = {error, _Err, _NodeGrps}} -> + disconnect_nodes([Node], Conns), + To ! {config_error, Vsn, From, node()}, + {noreply, node_state(remove, Node, S)}; + + #gconf{group_name = CCName, + group_list = CCNodes, + group_publish_type = PubType} -> + %% OK, change the state of the node to 'sync' + global_name_server ! {group_nodeup, Node, CId}, + To ! {config_ok, Vsn, From, node()}, + {noreply, node_state(sync, Node, S)}; + + #gconf{} -> + %% group definitions were not in agreement + disconnect_nodes([Node], Conns), + To ! {config_error, Vsn, From, node()}, + {noreply, node_state(sync_error, Node, S)} + end + catch + throw:{noreply, _} = Return -> Return + end; handle_cast(_Cast, S) -> % io:format("***** handle_cast ~p~n",[_Cast]), {noreply, S}. - +handle_info(Msg, #state{rpc_requests = Requests} = S) -> + try rpc_check_response(Msg, Requests) of + NoMatch when NoMatch == no_request; NoMatch == no_response -> + continue_handle_info(Msg, S); + {Result, Label, NewRequests} -> + {noreply, + handle_rpc_response(ok, Result, Label, + S#state{rpc_requests = NewRequests})} + catch + Class:{Reason, Label, NewRequests} -> + {noreply, + handle_rpc_response(Class, Reason, Label, + S#state{rpc_requests = NewRequests})} + end. %%%==================================================================================== -%%% A node went down. If no global group configuration inform global; -%%% if global group configuration inform global only if the node is one in -%%% the own global group. +%%% Distribution on this node was started... %%%==================================================================================== -handle_info({nodeup, Node}, S) when S#state.sync_state =:= no_conf -> -% io:format("~p>>>>> nodeup, Node ~p ~n",[node(), Node]), +continue_handle_info({nodeup, Node, #{connection_id := undefined}}, + #state{connections = Conns, rpc_requests = Reqs}) -> + %% Check configuration since we now know how to interpret it + %% w<hen we know our name... + S = initial_group_setup(alive_state_change_group_conf(Node), Conns, Reqs), send_monitor(S#state.monitor, {nodeup, Node}, S#state.sync_state), - global_name_server ! {nodeup, Node}, {noreply, S}; -handle_info({nodeup, Node}, S) -> + +%%%==================================================================================== +%%% A new node connected... +%%%==================================================================================== +continue_handle_info({nodeup, Node, #{connection_id := CId}}, + #state{sync_state = no_conf, + connections = Conns} = S) -> % io:format("~p>>>>> nodeup, Node ~p ~n",[node(), Node]), - OthersNG = case S#state.sync_state of - synced -> - X = (catch rpc:call(Node, global_group, get_own_nodes, [])), - case X of - X when is_list(X) -> - lists:sort(X); - _ -> - [] - end; - no_conf -> - [] - end, - - NNC = lists:delete(Node, S#state.no_contact), - NSE = lists:delete(Node, S#state.sync_error), - OwnNG = get_own_nodes(), - case OwnNG of - OthersNG -> - send_monitor(S#state.monitor, {nodeup, Node}, S#state.sync_state), - global_name_server ! {nodeup, Node}, - case lists:member(Node, S#state.nodes) of - false -> - NN = lists:sort([Node | S#state.nodes]), - {noreply, S#state{nodes = NN, - no_contact = NNC, - sync_error = NSE}}; - true -> - {noreply, S#state{no_contact = NNC, - sync_error = NSE}} - end; - _ -> - case {lists:member(Node, get_own_nodes()), - lists:member(Node, S#state.sync_error)} of - {true, false} -> - NSE2 = lists:sort([Node | S#state.sync_error]), - {noreply, S#state{no_contact = NNC, - sync_error = NSE2}}; - _ -> - {noreply, S} - end + send_monitor(S#state.monitor, {nodeup, Node}, S#state.sync_state), + {noreply, S#state{connections = Conns#{Node => CId}}}; +continue_handle_info({nodeup, Node, #{connection_id := CId}}, + #state{rpc_requests = Reqs, connections = Conns} = S) -> +% io:format("~p>>>>> nodeup, Node ~p ~n",[node(), Node]), + + NewConns = Conns#{Node => CId}, + case member(true, Node) of + false -> + %% Node is not part of our group configuration... + disconnect_nodes([Node], NewConns), + {noreply, S#state{connections = NewConns}}; + true -> + + %% Node is part of our group configuration. Check that it has the + %% same view of the configuration as us... + + NewReqs = rpc_send_request(Node, global_group, get_own_nodes, [], + {nodeup_conf_check, Node, CId}, Reqs), + + %% The response is later handled in handle_rpc_response()... + + %% Set the node in 'sync_error' state. It will later be changed + %% to 'sync' if it passes the 'nodeup_conf_check' test... + {noreply, node_state(sync_error, Node, + S#state{rpc_requests = NewReqs, + connections = NewConns})} end; %%%==================================================================================== +%%% Distribution on this node was shut down... +%%%==================================================================================== +continue_handle_info({nodedown, Node, #{connection_id := undefined}}, S) -> + %% Clear group configuration. We don't know how to interpret it + %% unless we know our own name... + #gconf{state = no_conf, + group_name = DefGroupName, + group_list = _DefNodes, + group_publish_type = PubTpGrp, + other_groups = DefOther} = alive_state_change_group_conf(nonode@nohost), + send_monitor(S#state.monitor, {nodedown, Node}, no_conf), + {noreply, S#state{group_publish_type = PubTpGrp, + sync_state = no_conf, group_name = DefGroupName, + nodes = #{}, other_grps = DefOther, + config_check = undefined}}; + +%%%==================================================================================== %%% A node has crashed. -%%% nodedown must always be sent to global; this is a security measurement -%%% because during release upgrade the global_groups parameter is upgraded -%%% before the node is synced. This means that nodedown may arrive from a -%%% node which we are not aware of. %%%==================================================================================== -handle_info({nodedown, Node}, S) when S#state.sync_state =:= no_conf -> +continue_handle_info({nodedown, Node, _Info}, + #state{sync_state = no_conf, + monitor = Monitor, + connections = Conns} = S) -> % io:format("~p>>>>> nodedown, no_conf Node ~p~n",[node(), Node]), - send_monitor(S#state.monitor, {nodedown, Node}, S#state.sync_state), - global_name_server ! {nodedown, Node}, - {noreply, S}; -handle_info({nodedown, Node}, S) -> + send_monitor(Monitor, {nodedown, Node}, no_conf), + {noreply, S#state{connections = maps:remove(Node, Conns)}}; +continue_handle_info({nodedown, Node, _Info}, + #state{sync_state = SyncState, + monitor = Monitor, + connections = Conns} = S) -> % io:format("~p>>>>> nodedown, Node ~p ~n",[node(), Node]), - send_monitor(S#state.monitor, {nodedown, Node}, S#state.sync_state), - NN = lists:delete(Node, S#state.nodes), - NSE = lists:delete(Node, S#state.sync_error), - NNC = case lists:member(Node, get_own_nodes()) of - false -> - global_name_server ! {ignore_node, Node}, - S#state.no_contact; - true -> - global_name_server ! {nodedown, Node}, - case lists:member(Node, S#state.no_contact) of - false -> - [Node | S#state.no_contact]; - true -> - S#state.no_contact - end - end, - {noreply, S#state{nodes = NN, no_contact = NNC, sync_error = NSE}}; + send_monitor(Monitor, {nodedown, Node}, SyncState), + {noreply, node_state(no_contact, Node, + S#state{connections = maps:remove(Node, Conns)})}; %%%==================================================================================== %%% A node has changed its global_groups definition, and is telling us that we are not %%% included in his group any more. This could happen at release upgrade. %%%==================================================================================== -handle_info({disconnect_node, Node}, S) -> +continue_handle_info({disconnect_node, Node}, #state{monitor = Monitor, + sync_state = SyncState, + nodes = Nodes, + connections = Conns} = S) -> % io:format("~p>>>>> disconnect_node Node ~p CN ~p~n",[node(), Node, S#state.nodes]), - case {S#state.sync_state, lists:member(Node, S#state.nodes)} of - {synced, true} -> - send_monitor(S#state.monitor, {nodedown, Node}, S#state.sync_state); - _ -> - cont + case {SyncState, maps:get(Node, Nodes, not_member)} of + {synced, sync} -> send_monitor(Monitor, {nodedown, Node}, SyncState); + _ -> ok end, - global_name_server ! {ignore_node, Node}, - NN = lists:delete(Node, S#state.nodes), - NNC = lists:delete(Node, S#state.no_contact), - NSE = lists:delete(Node, S#state.sync_error), - {noreply, S#state{nodes = NN, no_contact = NNC, sync_error = NSE}}; - + CId = maps:get(Node, Conns, not_connected), + global_name_server ! {group_nodedown, Node, CId}, + {noreply, node_state(sync_error, Node, S)}; + +continue_handle_info({config_ok, ?cc_vsn, {Pid, CCSession}, Node}, + #state{config_check = {CCSession, Mons}, + connections = Conns} = S0) when Pid == self() -> + try + {Mon, NewMons} = case maps:take(Node, Mons) of + error -> throw({noreply, S0}); + MonTake -> MonTake + end, + erlang:demonitor(Mon), + S1 = if map_size(NewMons) == 0 -> + S0#state{config_check = undefined}; + true -> + S0#state{config_check = {CCSession, NewMons}} + end, + CId = case maps:get(Node, Conns, undefined) of + undefined -> throw({noreply, S1}); + CId0 -> CId0 + end, + global_name_server ! {group_nodeup, Node, CId}, + {noreply, node_state(sync, Node, S0)} + catch + throw:{noreply, _} = Return -> Return + end; +continue_handle_info({config_error, ?cc_vsn, {Pid, CCSession}, Node}, + #state{config_check = {CCSession, Mons}, + connections = Conns} = S0) when Pid == self() -> + try + {Mon, NewMons} = case maps:take(Node, Mons) of + error -> throw({noreply, S0}); + MonTake -> MonTake + end, + erlang:demonitor(Mon), + S1 = if map_size(NewMons) == 0 -> + S0#state{config_check = undefined}; + true -> + S0#state{config_check = {CCSession, NewMons}} + end, + CId = maps:get(Node, Conns, not_connected), + global_name_server ! {group_nodedown, Node, CId}, + log_sync_error(Node), + {noreply, node_state(sync_error, Node, S1)} + catch + throw:{noreply, _} = Return -> Return + end; +continue_handle_info({'DOWN', Mon, process, {global_group, Node}, Reason}, + #state{config_check = {CCSession, Mons}, + connections = Conns} = S0) -> + %% This clause needs to be the last one matching on 'DOWN' + %% messages... + try + NewMons = case maps:take(Node, Mons) of + {Mon, NewMons0} -> NewMons0; + _ -> throw({noreply, S0}) + end, + S1 = if map_size(NewMons) == 0 -> + S0#state{config_check = undefined}; + true -> + S0#state{config_check = {CCSession, NewMons}} + end, + CId = maps:get(Node, Conns, not_connected), + global_name_server ! {group_nodedown, Node, CId}, + What = if Reason == noconnection -> + no_contact; + true -> + log_sync_error(Node), + sync_error + end, + {noreply, node_state(What, Node, S1)} + catch + throw:{noreply, _} = Return -> Return + end; -handle_info({'EXIT', ExitPid, Reason}, S) -> +continue_handle_info({'EXIT', ExitPid, Reason}, S) -> check_exit(ExitPid, Reason), {noreply, S}; -handle_info(_Info, S) -> +continue_handle_info(_Info, S) -> % io:format("***** handle_info = ~p~n",[_Info]), {noreply, S}. @@ -982,26 +1114,176 @@ terminate(_Reason, _S) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +log_sync_error(Node) -> + Txt = io_lib:format("global_group: Could not synchronize with node ~p~n" + "because global_groups parameter were not in agreement.~n", + [Node]), + error_logger:error_report(Txt), + ok. +%%%==================================================================================== +%%% Schedule group consistency check due to config change. The response is +%%% later handled in handle_rpc_response()... +%%%==================================================================================== - +schedule_conf_changed_checks(Nodes, Requests, Connections) -> + lists:foldl(fun (Node, RequestsAcc) -> + CId = maps:get(Node, Connections, not_connected), + rpc_send_request(Node, global_group, get_own_nodes, [], + {conf_changed_check, Node, CId}, + RequestsAcc) + end, Requests, Nodes). %%%==================================================================================== -%%% Check the global group configuration. +%%% We got a response to an asynchronous rpc request %%%==================================================================================== -config_scan(NodeGrps) -> - config_scan(NodeGrps, original). +handle_rpc_response(ok, Nodes, {nodeup_conf_check, Node, ReqCId}, + #state{connections = Conns} = S) when is_list(Nodes) -> + case maps:get(Node, Conns, undefined) of + CId when ReqCId == CId orelse (ReqCId == not_connected + andalso is_integer(CId)) -> + OwnNodes = get_own_nodes(true), + + case lists:sort(Nodes) of + OwnNodes -> %% Node has the same group config as us... + send_monitor(S#state.monitor, {nodeup, Node}, S#state.sync_state), + global_name_server ! {group_nodeup, Node, CId}, + node_state(sync, Node, S); -config_scan(NodeGrps, original) -> - case config_scan(NodeGrps, publish_type) of - {DefGroupName, _, DefNodes, DefOther} -> - {DefGroupName, DefNodes, DefOther}; - Error -> - Error + _ -> %% Node has not the same group config but is in our config... + disconnect_nodes([Node], Conns), + node_state(sync_error, Node, S) + end; + _ -> + %% Connection has gone down since we issued the request; + %% no action taken... + S + end; +handle_rpc_response(error, nodedown, {nodeup_conf_check, Node, ReqCId}, + #state{connections = Conns} = S) -> + case maps:get(Node, Conns, undefined) of + CId when ReqCId == CId orelse (ReqCId == not_connected + andalso is_integer(CId)) -> + node_state(no_contact, Node, S); + _ -> + %% Connection has gone down since we issued the request; + %% no action taken... + S end; -config_scan(NodeGrps, publish_type) -> - config_scan(node(), normal, NodeGrps, no_name, [], []). +handle_rpc_response(_, _, {nodeup_conf_check, Node, ReqCId}, + #state{connections = Conns} = S) -> + case maps:get(Node, Conns, undefined) of + CId when ReqCId == CId orelse (ReqCId == not_connected + andalso is_integer(CId)) -> + disconnect_nodes([Node], Conns), + node_state(sync_error, Node, S); + _ -> + %% Connection has gone down since we issued the request; + %% no action taken... + S + end; +handle_rpc_response(ok, Nodes, {conf_changed_check, Node, ReqCId}, + #state{connections = Conns} = S) when is_list(Nodes) -> + case maps:get(Node, Conns, undefined) of + CId when ReqCId == CId orelse (ReqCId == not_connected + andalso is_integer(CId)) -> + OwnNodes = get_own_nodes(true), + + case lists:sort(Nodes) of + OwnNodes -> %% Node has the same group config as us... + node_state(sync, Node, S); + + _ -> %% Node has not the same group config but is in our config... + disconnect_nodes([Node], Conns), + node_state(sync_error, Node, S) + end; + _ -> + %% Connection has gone down since we issued the request; + %% no action taken... + S + end; +handle_rpc_response(error, nodedown, {conf_changed_check, Node, ReqCId}, + #state{connections = Conns} = S) -> + case maps:get(Node, Conns, undefined) of + CId when ReqCId == CId orelse (ReqCId == not_connected + andalso is_integer(CId)) -> + node_state(no_contact, Node, S); + _ -> + %% Connection has gone down since we issued the request; + %% no action taken... + S + end; +handle_rpc_response(_, _, {conf_changed_check, Node, ReqCId}, + #state{connections = Conns} = S) -> + case maps:get(Node, Conns, undefined) of + CId when ReqCId == CId orelse (ReqCId == not_connected + andalso is_integer(CId)) -> + disconnect_nodes([Node], Conns), + node_state(sync_error, Node, S); + _ -> + %% Connection has gone down since we issued the request; + %% no action taken... + S + end; +handle_rpc_response(ok, agreed, {ng_add_check, Node, ReqCId}, + #state{connections = Conns} = S) -> + case maps:get(Node, Conns, undefined) of + CId when ReqCId == CId orelse (ReqCId == not_connected + andalso is_integer(CId)) -> + node_state(sync, Node, S); + _ -> + %% Connection has gone down since we issued the request; + %% no action taken... + S + end; +handle_rpc_response(error, nodedown, {ng_add_check, Node, ReqCId}, + #state{connections = Conns} = S) -> + case maps:get(Node, Conns, undefined) of + CId when ReqCId == CId orelse (ReqCId == not_connected + andalso is_integer(CId)) -> + node_state(no_contact, Node, S); + _ -> + %% Connection has gone down since we issued the request; + %% no action taken... + S + end; +handle_rpc_response(_, _, {ng_add_check, Node, ReqCId}, + #state{connections = Conns} = S) -> + case maps:get(Node, Conns, undefined) of + CId when ReqCId == CId orelse (ReqCId == not_connected + andalso is_integer(CId)) -> + disconnect_nodes([Node], Conns), + node_state(sync_error, Node, S); + _ -> + %% Connection has gone down since we issued the request; + %% no action taken... + S + end. + + +%%%==================================================================================== +%%% Change state of node +%%%==================================================================================== + +node_state(What, Node, #state{nodes = Ns} = S) + when What == sync; What == sync_error; What == no_contact -> + case member(true, Node) of + true -> S#state{nodes = Ns#{Node => What}}; + false -> S#state{nodes = maps:remove(Node, Ns)} + end; +node_state(remove, Node, #state{nodes = Ns} = S) -> + case member(true, Node) of + true -> error({removing_node_state_of_member_node, Node}); + false -> S#state{nodes = maps:remove(Node, Ns)} + end. + +%%%==================================================================================== +%%% Check the global group configuration. +%%%==================================================================================== + +config_scan(MyNode, NodeGrps) -> + config_scan(MyNode, normal, NodeGrps, no_name, [], []). config_scan(_MyNode, PubType, [], Own_name, OwnNodes, OtherNodeGrps) -> {Own_name, PubType, lists:sort(OwnNodes), lists:reverse(OtherNodeGrps)}; @@ -1027,97 +1309,120 @@ grp_tuple({Name, hidden, Nodes}) -> grp_tuple({Name, normal, Nodes}) -> {Name, normal, Nodes}. - %%%==================================================================================== -%%% The special process which checks that all nodes in the own global group -%%% agrees on the configuration. -%%%==================================================================================== --spec sync_init(_, _, _, _) -> no_return(). -sync_init(Type, Cname, PubType, Nodes) -> - {Up, Down} = sync_check_node(lists:delete(node(), Nodes), [], []), - sync_check_init(Type, Up, Cname, Nodes, Down, PubType). - -sync_check_node([], Up, Down) -> - {Up, Down}; -sync_check_node([Node|Nodes], Up, Down) -> - case net_adm:ping(Node) of - pang -> - sync_check_node(Nodes, Up, [Node|Down]); - pong -> - sync_check_node(Nodes, [Node|Up], Down) - end. +%%% Get/set configuration +%%%==================================================================================== +%% +%% Fetch and install group configuration... +%% +fetch_new_group_conf(GG) -> + fetch_new_group_conf(GG, undefined). +fetch_new_group_conf(GG, NodeName) -> + GGConf = case application:get_env(kernel, global_groups) of + undefined -> undefined; + {ok, V} -> V + end, + new_group_conf(GG, GGConf, NodeName). + +%% +%% Install new group configuration... +%% +new_group_conf(GG, KernParamValue) -> + new_group_conf(GG, KernParamValue, undefined). + +new_group_conf(GG, KernParamValue, NodeName) -> + case persistent_term:get(?MODULE, #gconf{}) of + #gconf{parameter_value = KernParamValue, + node_name = Name} = GConf when NodeName == Name; + NodeName == undefined -> + GConf; + #gconf{node_name = Name} -> + UseNodeName = if NodeName == undefined -> Name; + true -> NodeName + end, + GConf = make_group_conf(UseNodeName, KernParamValue), + %% Only save in persistent term if called by + %% the global_group server... + if GG == true -> persistent_term:put(?MODULE, GConf); + true -> ok + end, + GConf + end. -%%%------------------------------------------------------------- -%%% Check that all nodes are in agreement of the global -%%% group configuration. -%%%------------------------------------------------------------- --spec sync_check_init(_, _, _, _, _, _) -> no_return(). -sync_check_init(Type, Up, Cname, Nodes, Down, PubType) -> - sync_check_init(Type, Up, Cname, Nodes, 3, [], Down, PubType). +make_group_conf(NodeName, KernParamValue) when KernParamValue == undefined; + KernParamValue == []; + NodeName == nonode@nohost -> + %% Empty group configuration if it is not defined, or if we are not + %% alive (if we are not alive we cannot interpret the configuration + %% since we don't know our own node name)... + #gconf{parameter_value = KernParamValue, + node_name = NodeName}; +make_group_conf(NodeName, KernParamValue) -> + case catch config_scan(NodeName, KernParamValue) of + + {error, Error} -> + #gconf{parameter_value = KernParamValue, + node_name = NodeName, + state = {error, Error, KernParamValue}}; + + {GName, PubTpGrp, OwnNodes, OtherGroups} -> + GMap = if OwnNodes == [] -> + all; + true -> + maps:from_list(lists:map(fun (Node) -> + {Node, ok} + end, OwnNodes)) + end, + #gconf{parameter_value = KernParamValue, + node_name = NodeName, + group_name = GName, + group_publish_type = PubTpGrp, + group_list = lists:sort(OwnNodes), + group_map = GMap, + other_groups = OtherGroups, + state = conf} + end. --spec sync_check_init(_, _, _, _, _, _, _, _) -> no_return(). -sync_check_init(_Type, NoContact, _Cname, _Nodes, 0, ErrorNodes, Down, _PubType) -> - case ErrorNodes of - [] -> - gen_server:cast(global_group, {synced, lists:sort(NoContact ++ Down)}); - _ -> - gen_server:cast(global_group, {sync_error, lists:sort(NoContact ++ Down), - ErrorNodes}) - end, - receive - kill -> - exit(normal) - after 5000 -> - exit(normal) - end; +%% +%% Adjust group configuration according to alive state +%% +alive_state_change_group_conf(NodeName) when NodeName /= undefined -> + case persistent_term:get(?MODULE, #gconf{}) of + #gconf{parameter_value = ParamValue} when ParamValue /= invalid -> + new_group_conf(true, ParamValue, NodeName); + #gconf{} -> + fetch_new_group_conf(true, NodeName) + end. -sync_check_init(Type, Up, Cname, Nodes, N, ErrorNodes, Down, PubType) -> - ConfCheckMsg = case PubType of - normal -> - {conf_check, ?cc_vsn, node(), self(), Type, Cname, Nodes}; - _ -> - {conf_check, ?cc_vsn, node(), self(), Type, Cname, PubType, Nodes} - end, - lists:foreach(fun(Node) -> - gen_server:cast({global_group, Node}, ConfCheckMsg) - end, Up), - case sync_check(Up) of - {ok, synced} -> - sync_check_init(Type, [], Cname, Nodes, 0, ErrorNodes, Down, PubType); - {error, NewErrorNodes} -> - sync_check_init(Type, [], Cname, Nodes, 0, ErrorNodes ++ NewErrorNodes, Down, PubType); - {more, Rem, NewErrorNodes} -> - %% Try again to reach the global_group, - %% obviously the node is up but not the global_group process. - sync_check_init(Type, Rem, Cname, Nodes, N-1, ErrorNodes ++ NewErrorNodes, Down, PubType) +%% +%% Lookup current group configuration +%% + +lookup_group_conf(GG) -> + try + persistent_term:get(?MODULE) + catch + error:badarg -> fetch_new_group_conf(GG) end. -sync_check(Up) -> - sync_check(Up, Up, []). +%%%==================================================================================== +%%% The global_group_check process. It used to take care of syncing other nodes, +%%% but nowadays only serve as a dispatcher config check reply messages from old +%%% global_group servers to our global_group server. +%%%==================================================================================== -sync_check([], _Up, []) -> - {ok, synced}; -sync_check([], _Up, ErrorNodes) -> - {error, ErrorNodes}; -sync_check(Rem, Up, ErrorNodes) -> +global_group_check_dispatcher() -> receive - {config_ok, ?cc_vsn, Pid, Node} when Pid =:= self() -> - global_name_server ! {nodeup, Node}, - sync_check(Rem -- [Node], Up, ErrorNodes); - {config_error, ?cc_vsn, Pid, Node} when Pid =:= self() -> - sync_check(Rem -- [Node], Up, [Node | ErrorNodes]); - {no_global_group_configuration, ?cc_vsn, Pid, Node} when Pid =:= self() -> - sync_check(Rem -- [Node], Up, [Node | ErrorNodes]); - %% Ignore, illegal vsn or illegal Pid - _ -> - sync_check(Rem, Up, ErrorNodes) - after 2000 -> - %% Try again, the previous conf_check message - %% apparently disapared in the magic black hole. - {more, Rem, ErrorNodes} - end. + {config_ok, _Vsn, _From, _Node} = Msg -> + global_group ! Msg, ok; + {config_error, _Vsn, _From, _Node} = Msg -> + global_group ! Msg, ok; + _Garbage -> + ok + end, + global_group_check_dispatcher(). %%%==================================================================================== @@ -1164,7 +1469,7 @@ send_monitor([], _, _) -> ok. safesend(Name, {Msg, Node}) when is_atom(Name) -> - case lists:member(Node, get_own_nodes()) of + case member(true, Node) of true -> case whereis(Name) of undefined -> @@ -1176,7 +1481,7 @@ safesend(Name, {Msg, Node}) when is_atom(Name) -> not_own_group end; safesend(Pid, {Msg, Node}) -> - case lists:member(Node, get_own_nodes()) of + case member(true, Node) of true -> Pid ! {Msg, Node}; false -> @@ -1205,7 +1510,8 @@ check_exit(ExitPid, Reason) -> % io:format("===EXIT=== ~p ~p ~n~p ~n~p ~n~p ~n~n",[ExitPid, Reason, get(registered_names), get(send), get(whereis_name)]), check_exit_reg(get(registered_names), ExitPid, Reason), check_exit_send(get(send), ExitPid, Reason), - check_exit_where(get(whereis_name), ExitPid, Reason). + check_exit_where(get(whereis_name), ExitPid, Reason), + check_exit_ggc(ExitPid, Reason). check_exit_reg(undefined, _ExitPid, _Reason) -> @@ -1246,29 +1552,23 @@ check_exit_where(Where, ExitPid, Reason) -> not_found_ignored end. - - -%%%==================================================================================== -%%% Kill any possible global_group_check processes -%%%==================================================================================== -kill_global_group_check() -> - case whereis(global_group_check) of - undefined -> - ok; - Pid -> - unlink(Pid), - global_group_check ! kill, - unregister(global_group_check) +check_exit_ggc(ExitPid, Reason) -> + case get(global_group_check) of + ExitPid -> + %% Our global_group_check companion died; terminate... + exit(Reason); + _ -> + ok end. - %%%==================================================================================== %%% Disconnect nodes not belonging to own global_groups %%%==================================================================================== -disconnect_nodes(DisconnectNodes) -> +disconnect_nodes(DisconnectNodes, Conns) -> lists:foreach(fun(Node) -> - {global_group, Node} ! {disconnect_node, node()}, - global_name_server ! {ignore_node, Node} + CId = maps:get(Node, Conns, not_connected), + global_name_server ! {group_nodedown, Node, CId}, + {global_group, Node} ! {disconnect_node, node()} end, DisconnectNodes). @@ -1276,10 +1576,11 @@ disconnect_nodes(DisconnectNodes) -> %%%==================================================================================== %%% Disconnect nodes not belonging to own global_groups %%%==================================================================================== -force_nodedown(DisconnectNodes) -> +force_nodedown(DisconnectNodes, Conns) -> lists:foreach(fun(Node) -> - erlang:disconnect_node(Node), - global_name_server ! {ignore_node, Node} + CId = maps:get(Node, Conns, not_connected), + global_name_server ! {group_nodedown, Node, CId}, + erlang:disconnect_node(Node) end, DisconnectNodes). @@ -1288,92 +1589,131 @@ force_nodedown(DisconnectNodes) -> %%% Get the current global_groups definition %%%==================================================================================== get_own_nodes_with_errors() -> - case application:get_env(kernel, global_groups) of - undefined -> - {ok, all}; - {ok, []} -> - {ok, all}; - {ok, NodeGrps} -> - case catch config_scan(NodeGrps, publish_type) of - {error, Error} -> - {error, Error}; - {_, _, NodesDef, _} -> - {ok, lists:sort(NodesDef)} - end + case lookup_group_conf(false) of + #gconf{state = {error, Error, _NodeGrps}} -> + {error, Error}; + #gconf{group_list = []} -> + {ok, all}; + #gconf{group_list = Nodes} -> + {ok, Nodes} end. get_own_nodes() -> - case get_own_nodes_with_errors() of - {ok, all} -> - []; - {error, _} -> - []; - {ok, Nodes} -> - Nodes - end. + get_own_nodes(false). + +get_own_nodes(GG) when is_boolean(GG) -> + get_own_nodes(lookup_group_conf(GG)); +get_own_nodes(#gconf{group_list = Nodes}) -> + Nodes. %%%==================================================================================== -%%% -hidden command line argument +%%% Is a group configured? %%%==================================================================================== -publish_arg() -> - case net_kernel:dist_listen() of - false -> - hidden; - _ -> - case init:get_argument(hidden) of - {ok,[[]]} -> - hidden; - {ok,[["true"]]} -> - hidden; - _ -> - normal - end + +-spec group_configured() -> boolean(). + +group_configured() -> + group_configured(lookup_group_conf(false)). + +group_configured(GConf) -> + case GConf of + #gconf{state = no_conf} -> + false; + #gconf{} -> + true end. %%%==================================================================================== -%%% Own group publication type and nodes +%%% Is node a participant? +%%% +%%% That is, a node is a participant if it either is a member of our configured group, +%%% or there are no group configured (in which case all nodes are participants). %%%==================================================================================== -own_group() -> - case application:get_env(kernel, global_groups) of - undefined -> - no_group; - {ok, []} -> - no_group; - {ok, NodeGrps} -> - case catch config_scan(NodeGrps, publish_type) of - {error, _} -> - no_group; - {_, PubTpGrp, NodesDef, _} -> - {PubTpGrp, NodesDef} - end - end. +-spec participant(Node::node()) -> boolean(). + +participant(Node) -> + case lookup_group_conf(false) of + #gconf{group_map = all} -> + true; + #gconf{group_map = #{Node := ok}} -> + true; + #gconf{} -> + false + end. %%%==================================================================================== -%%% Help function which computes publication list +%%% Is node member of our configured group? %%%==================================================================================== -publish_on_nodes(normal, no_group) -> - all; -publish_on_nodes(hidden, no_group) -> - []; -publish_on_nodes(normal, {normal, _}) -> - all; -publish_on_nodes(hidden, {_, Nodes}) -> - Nodes; -publish_on_nodes(_, {hidden, Nodes}) -> - Nodes. + +-spec member(Node::node()) -> boolean(). + +member(Node) -> + member(false, Node). + +member(GG, Node) -> + case lookup_group_conf(GG) of + #gconf{group_map = #{Node := ok}} -> + true; + #gconf{} -> + false + end. %%%==================================================================================== -%%% Update net_kernels publication list +%%% Publish on node? %%%==================================================================================== -update_publish_nodes(PubArg) -> - update_publish_nodes(PubArg, no_group). -update_publish_nodes(PubArg, MyGroup) -> - net_kernel:update_publish_nodes(publish_on_nodes(PubArg, MyGroup)). +-spec publish(OwnPublishType, Node) -> boolean() when + OwnPublishType :: 'hidden' | 'normal', + Node :: node(). + +publish(OwnPublishType, Node) when (OwnPublishType == normal + orelse OwnPublishType == hidden) + andalso is_atom(Node) -> + case lookup_group_conf(false) of + #gconf{group_map = all} when OwnPublishType == normal -> + true; + #gconf{group_map = all} when OwnPublishType == hidden -> + false; + #gconf{group_publish_type = normal} when OwnPublishType == normal -> + true; + #gconf{group_map = #{Node := ok}} -> + true; + #gconf{} -> + false + end. %%%==================================================================================== -%%% Fetch publication list +%%% Async rpc request/resonse %%%==================================================================================== -publish_on_nodes() -> - publish_on_nodes(publish_arg(), own_group()). + +%% These have a similar API as erpc:send_request/6 and erpc:check_response/3 +%% introduced in OTP 25 and can easily be replaced by the erpc primitives in +%% OTP 25. + +rpc_send_request(Node, M, F, A, Label, Requests) -> + {_P, Mon} = spawn_monitor(fun () -> + Result = rpc:call(Node, M, F, A), + exit({rpc_call_result__, Result}) + end), + Requests#{Mon => Label}. + +rpc_check_response(_Msg, RequestsMap) when map_size(RequestsMap) == 0 -> + no_request; +rpc_check_response({'DOWN', Mon, process, _Pid, Reason}, RequestsMap) -> + case maps:is_key(Mon, RequestsMap) of + false -> + no_response; + true -> + {Label, NewRequestsMap} = maps:take(Mon, RequestsMap), + case Reason of + {rpc_call_result__, {badrpc, BadRpcReason}} -> + error({BadRpcReason, Label, NewRequestsMap}); + {rpc_call_result__, Res} -> + {Res, Label, NewRequestsMap}; + Error -> + error({Error, Label, NewRequestsMap}) + end + end; +rpc_check_response(_Msg, _RequestMap) -> + no_response. diff --git a/lib/kernel/src/kernel.app.src b/lib/kernel/src/kernel.app.src index f78cfbfad6..a931cd3136 100644 --- a/lib/kernel/src/kernel.app.src +++ b/lib/kernel/src/kernel.app.src @@ -156,6 +156,6 @@ {prevent_overlapping_partitions, false} ]}, {mod, {kernel, []}}, - {runtime_dependencies, ["erts-@OTP-17843@", "stdlib-3.13", "sasl-3.0"]} + {runtime_dependencies, ["erts-@OTP-17843:OTP-17934@", "stdlib-3.13", "sasl-3.0"]} ] }. diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index 8dfae3a505..6e8211237b 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -72,7 +72,7 @@ epmd_module/0, dist_listen/0]). --export([disconnect/1, passive_cnct/1]). +-export([disconnect/1, async_disconnect/1, passive_cnct/1]). -export([hidden_connect_node/1]). -export([set_net_ticktime/1, set_net_ticktime/2, get_net_ticktime/0]). @@ -80,7 +80,7 @@ connecttime/0, i/0, i/1, verbose/1]). --export([publish_on_node/1, update_publish_nodes/1]). +-export([publish_on_node/1]). %% Internal exports for spawning processes. @@ -111,7 +111,6 @@ listen, %% list of #listen allowed, %% list of allowed nodes in a restricted system verbose = 0, %% level of verboseness - publish_on_nodes = undefined, dyn_name_pool = #{}, %% Reusable remote node names: #{Host => [{Name,Creation}]} supervisor %% Our supervisor (net_sup | net_sup_dynamic | {restart,Restarter}) }). @@ -271,15 +270,41 @@ monitor_nodes(Flag) -> -spec monitor_nodes(Flag, Options) -> ok | Error when Flag :: boolean(), - Options :: [Option], - Option :: {node_type, NodeType} - | nodedown_reason, + Options :: OptionsList | OptionsMap, + OptionsList :: [ListOption], + ListOption :: connection_id + | {node_type, NodeType} + | nodedown_reason, + OptionsMap :: #{connection_id => boolean(), + node_type => NodeType, + nodedown_reason => boolean()}, NodeType :: visible | hidden | all, Error :: error | {error, term()}. monitor_nodes(Flag, Opts) -> - case catch process_flag({monitor_nodes, Opts}, Flag) of - N when is_integer(N) -> ok; - _ -> mk_monitor_nodes_error(Flag, Opts) + try + MapOpts = if is_map(Opts) -> + error = maps:find(list, Opts), + Opts; + is_list(Opts) -> + lists:foldl(fun (nodedown_reason, Acc) -> + Acc#{nodedown_reason => true}; + (connection_id, Acc) -> + Acc#{connection_id => true}; + ({node_type, Val}, Acc) -> + case maps:find(node_type, Acc) of + error -> ok; + {ok, Val} -> ok + end, + Acc#{node_type => Val} + end, + #{list => true}, + Opts) + end, + true = is_integer(process_flag({monitor_nodes, MapOpts}, Flag)), + ok + catch + _:_ -> + mk_monitor_nodes_error(Flag, Opts) end. %% ... @@ -299,13 +324,14 @@ passive_cnct(Node) -> disconnect(Node) -> request({disconnect, Node}). +async_disconnect(Node) -> + gen_server:cast(net_kernel, {disconnect, Node}). + %% Should this node publish itself on Node? publish_on_node(Node) when is_atom(Node) -> - request({publish_on_node, Node}). - -%% Update publication list -update_publish_nodes(Ns) -> - request({update_publish_nodes, Ns}). + global_group:publish(persistent_term:get({?MODULE, publish_type}, + hidden), + Node). -spec connect_node(Node) -> boolean() | ignored when Node :: node(). @@ -394,6 +420,8 @@ init({Name, LongOrShortNames, TickT, CleanHalt, NetSup}) -> case init_node(Name, LongOrShortNames, CleanHalt) of {ok, Node, Listeners} -> process_flag(priority, max), + persistent_term:put({?MODULE, publish_type}, + publish_type()), Ticktime = to_integer(TickT), Ticker = spawn_link(net_kernel, ticker, [self(), Ticktime]), {ok, #state{node = Node, @@ -410,6 +438,7 @@ init({Name, LongOrShortNames, TickT, CleanHalt, NetSup}) -> supervisor = NetSup }}; Error -> + _ = persistent_term:erase({?MODULE, publish_type}), {stop, Error} end. @@ -612,25 +641,6 @@ handle_call(longnames, From, State) -> handle_call(nodename, From, State) -> async_reply({reply, State#state.node, State}, From); -handle_call({update_publish_nodes, Ns}, From, State) -> - async_reply({reply, ok, State#state{publish_on_nodes = Ns}}, From); - -handle_call({publish_on_node, Node}, From, State) -> - NewState = case State#state.publish_on_nodes of - undefined -> - State#state{publish_on_nodes = - global_group:publish_on_nodes()}; - _ -> - State - end, - Publish = case NewState#state.publish_on_nodes of - all -> - true; - Nodes -> - lists:member(Node, Nodes) - end, - async_reply({reply, Publish, NewState}, From); - handle_call({verbose, Level}, From, State) -> async_reply({reply, State#state.verbose, State#state{verbose = Level}}, From); @@ -714,6 +724,13 @@ handle_call(_Msg, _From, State) -> %% handle_cast. %% ------------------------------------------------------------ +handle_cast({disconnect, Node}, State) when Node =:= node() -> + {noreply, State}; +handle_cast({disconnect, Node}, State) -> + verbose({disconnect, Node}, 1, State), + {_Reply, State1} = do_disconnect(Node, State), + {noreply, State1}; + handle_cast(_, State) -> {noreply,State}. @@ -729,6 +746,7 @@ code_change(_OldVsn, State, _Extra) -> %% ------------------------------------------------------------ terminate(Reason, State) -> + _ = persistent_term:erase({?MODULE, publish_type}), case Reason of no_network -> ok; @@ -1329,8 +1347,45 @@ check_options(Opts) when is_list(Opts) -> _ -> {error, {unknown_options, RestOpts2}} end; +check_options(Opts) when is_map(Opts) -> + BadMap0 = case maps:find(connection_id, Opts) of + error -> + Opts; + {ok, CIdBool} when is_boolean(CIdBool) -> + maps:remove(connection_id, Opts); + {ok, BadCIdVal} -> + throw({error, + {bad_option_value, + #{connection_id => BadCIdVal}}}) + end, + BadMap1 = case maps:find(nodedown_reason, BadMap0) of + error -> + BadMap0; + {ok, NRBool} when is_boolean(NRBool) -> + maps:remove(nodedown_reason, BadMap0); + {ok, BadNRVal} -> + throw({error, + {bad_option_value, + #{nodedown_reason => BadNRVal}}}) + end, + BadMap2 = case maps:find(node_type, BadMap1) of + error -> + BadMap1; + {ok, NTVal} when NTVal == visible; NTVal == hidden; NTVal == all -> + maps:remove(node_type, BadMap1); + {ok, BadNTVal} -> + throw({error, + {bad_option_value, + #{node_type => BadNTVal}}}) + end, + if map_size(BadMap2) == 0 -> + {error, internal_error}; + true -> + throw({error, {unknown_options, BadMap2}}) + end; check_options(Opts) -> - {error, {options_not_a_list, Opts}}. + {error, {invalid_options, Opts}}. + mk_monitor_nodes_error(Flag, _Opts) when Flag =/= true, Flag =/= false -> error; @@ -1679,6 +1734,24 @@ epmd_module() -> erl_epmd end. +%%% +%%% publish type +%%% +publish_type() -> + case dist_listen() of + false -> + hidden; + true -> + case init:get_argument(hidden) of + {ok,[[] | _]} -> + hidden; + {ok,[["true" | _] | _]} -> + hidden; + _ -> + normal + end + end. + %% %% dist_listen() -> whether the erlang distribution should listen for connections %% diff --git a/lib/kernel/test/erl_distribution_SUITE.erl b/lib/kernel/test/erl_distribution_SUITE.erl index d5ca3786fe..6f18bac4fe 100644 --- a/lib/kernel/test/erl_distribution_SUITE.erl +++ b/lib/kernel/test/erl_distribution_SUITE.erl @@ -922,8 +922,14 @@ run_tick_change_test(DCfg, B, C, PrevTT, TT) -> hidden_node(Config) when is_list(Config) -> run_dist_configs(fun hidden_node/2, Config). -hidden_node(DCfg, _Config) -> - HArgs = "-hidden", +hidden_node(DCfg, Config) -> + hidden_node(DCfg, "-hidden", Config), + hidden_node(DCfg, "-hidden -hidden", Config), + hidden_node(DCfg, "-hidden true -hidden true", Config), + ok. + +hidden_node(DCfg, HArgs, _Config) -> + ct:pal("--- Hidden argument(s): ~s~n", [HArgs]), {ok, V} = start_node(DCfg, visible_node), VMN = start_monitor_nodes_proc(V), {ok, H} = start_node(DCfg, hidden_node, HArgs), @@ -1259,7 +1265,9 @@ monitor_nodes_misc(DCfg, _Config) -> MonNodeState = monitor_node_state(), ok = net_kernel:monitor_nodes(true), ok = net_kernel:monitor_nodes(true, [{node_type, all}, nodedown_reason]), - ok = net_kernel:monitor_nodes(true, [nodedown_reason, {node_type, all}]), + ok = net_kernel:monitor_nodes(true, [nodedown_reason, {node_type, all}, connection_id]), + ok = net_kernel:monitor_nodes(true, #{node_type => all, nodedown_reason => true}), + ok = net_kernel:monitor_nodes(true, #{node_type => all, nodedown_reason => true, connection_id => true}), Names = get_numbered_nodenames(3, node), [NN1, NN2, NN3] = Names, @@ -1268,27 +1276,90 @@ monitor_nodes_misc(DCfg, _Config) -> receive {nodeup, N1} -> ok end, - receive {nodeup, N1, [{node_type, visible}]} -> ok end, + receive {nodeup, N1, #{node_type := visible}} -> ok end, + receive {nodeup, N2, #{node_type := hidden}} -> ok end, receive {nodeup, N1, [{node_type, visible}]} -> ok end, receive {nodeup, N2, [{node_type, hidden}]} -> ok end, - receive {nodeup, N2, [{node_type, hidden}]} -> ok end, + + NodesInfo = erlang:nodes(connected, #{connection_id => true}), + + {N1, #{connection_id := N1CId}} = lists:keyfind(N1, 1, NodesInfo), + {N2, #{connection_id := N2CId}} = lists:keyfind(N2, 1, NodesInfo), + + ct:pal("N1: ~p ~p~n", [N1, N1CId]), + ct:pal("N2: ~p ~p~n", [N2, N2CId]), + + receive {nodeup, N1, #{node_type := visible, connection_id := N1CId}} -> ok end, + receive {nodeup, N2, #{node_type := hidden, connection_id := N2CId}} -> ok end, + + N1UpInfoSorted = lists:sort([{node_type, visible},{connection_id, N1CId}]), + N2UpInfoSorted = lists:sort([{node_type, hidden},{connection_id, N2CId}]), + + receive {nodeup, N1, UpN1Info} -> N1UpInfoSorted = lists:sort(UpN1Info) end, + receive {nodeup, N2, UpN2Info} -> N2UpInfoSorted = lists:sort(UpN2Info) end, stop_node(N1), stop_node(N2), - VisbleDownInfo = lists:sort([{node_type, visible}, - {nodedown_reason, connection_closed}]), - HiddenDownInfo = lists:sort([{node_type, hidden}, - {nodedown_reason, connection_closed}]), - receive {nodedown, N1} -> ok end, - receive {nodedown, N1, Info1A} -> VisbleDownInfo = lists:sort(Info1A) end, - receive {nodedown, N1, Info1B} -> VisbleDownInfo = lists:sort(Info1B) end, - receive {nodedown, N2, Info2A} -> HiddenDownInfo = lists:sort(Info2A) end, - receive {nodedown, N2, Info2B} -> HiddenDownInfo = lists:sort(Info2B) end, + receive {nodedown, N1, #{node_type := visible, + nodedown_reason := connection_closed}} -> ok end, + receive {nodedown, N1, #{node_type := visible, + nodedown_reason := connection_closed, + connection_id := N1CId}} -> ok end, + receive {nodedown, N2, #{node_type := hidden, + nodedown_reason := connection_closed}} -> ok end, + receive {nodedown, N2, #{node_type := hidden, + nodedown_reason := connection_closed, + connection_id := N2CId}} -> ok end, + + N1ADownInfoSorted = lists:sort([{node_type, visible}, + {nodedown_reason, connection_closed}]), + N1BDownInfoSorted = lists:sort([{node_type, visible}, + {nodedown_reason, connection_closed}, + {connection_id, N1CId}]), + N2ADownInfoSorted = lists:sort([{node_type, hidden}, + {nodedown_reason, connection_closed}]), + N2BDownInfoSorted = lists:sort([{node_type, hidden}, + {nodedown_reason, connection_closed}, + {connection_id, N2CId}]), + + receive + {nodedown, N1, N1Info1} -> + case lists:sort(N1Info1) of + N1ADownInfoSorted -> + receive + {nodedown, N1, N1Info2} -> + N1BDownInfoSorted = lists:sort(N1Info2) + end; + N1BDownInfoSorted -> + receive + {nodedown, N1, N1Info2} -> + N1ADownInfoSorted = lists:sort(N1Info2) + end + end + end, + receive + {nodedown, N2, N2Info1} -> + case lists:sort(N2Info1) of + N2ADownInfoSorted -> + receive + {nodedown, N2, N2Info2} -> + N2BDownInfoSorted = lists:sort(N2Info2) + end; + N2BDownInfoSorted -> + receive + {nodedown, N2, N2Info2} -> + N2ADownInfoSorted = lists:sort(N2Info2) + end + end + end, ok = net_kernel:monitor_nodes(false, [{node_type, all}, nodedown_reason]), + ok = net_kernel:monitor_nodes(false, [nodedown_reason, {node_type, all}, connection_id]), + ok = net_kernel:monitor_nodes(false, #{node_type => all, nodedown_reason => true}), + ok = net_kernel:monitor_nodes(false, #{node_type => all, nodedown_reason => true, connection_id => true}), {ok, N3} = start_node(DCfg, NN3), receive {nodeup, N3} -> ok end, @@ -1424,7 +1495,11 @@ monitor_nodes_errors(Config) when is_list(Config) -> [gurka]}} = net_kernel:monitor_nodes(true, [gurka]), {error, - {options_not_a_list, + {unknown_options, + #{gurka := true}}} = net_kernel:monitor_nodes(true, + #{gurka => true}), + {error, + {invalid_options, gurka}} = net_kernel:monitor_nodes(true, gurka), {error, @@ -1446,6 +1521,10 @@ monitor_nodes_errors(Config) when is_list(Config) -> {node_type, blaha}}} = net_kernel:monitor_nodes(true, [{node_type, blaha}]), + {error, + {bad_option_value, + #{node_type := blaha}}} + = net_kernel:monitor_nodes(true, #{node_type => blaha}), MonNodeState = monitor_node_state(), ok. diff --git a/lib/kernel/test/global_SUITE.erl b/lib/kernel/test/global_SUITE.erl index c0c9f4b912..80767de4f6 100644 --- a/lib/kernel/test/global_SUITE.erl +++ b/lib/kernel/test/global_SUITE.erl @@ -30,7 +30,7 @@ ring/1, simple_ring/1, line/1, simple_line/1, global_lost_nodes/1, otp_1849/1, otp_3162/1, otp_5640/1, otp_5737/1, - otp_6931/1, + connect_all_false/1, simple_disconnect/1, simple_resolve/1, simple_resolve2/1, simple_resolve3/1, leftover_name/1, re_register_name/1, name_exit/1, external_nodes/1, @@ -77,7 +77,7 @@ all() -> advanced_partition, basic_name_partition, stress_partition, simple_ring, simple_line, ring, line, global_lost_nodes, otp_1849, otp_3162, otp_5640, - otp_5737, otp_6931, simple_disconnect, simple_resolve, + otp_5737, connect_all_false, simple_disconnect, simple_resolve, simple_resolve2, simple_resolve3, leftover_name, re_register_name, name_exit, external_nodes, many_nodes, sync_0, global_groups_change, register_1, both_known_1, @@ -2067,16 +2067,25 @@ otp_5737(Config) when is_list(Config) -> init_condition(Config), ok. -%% OTP-6931. Ignore nodeup when connect_all=false. -otp_6931(Config) when is_list(Config) -> +connect_all_false(Config) when is_list(Config) -> + %% OTP-6931. Ignore nodeup when connect_all=false. + connect_all_false_test("-connect_all false", Config), + %% OTP-17934: multipl -connect_all false and kernel parameter connect_all + connect_all_false_test("-connect_all false -connect_all false", Config), + connect_all_false_test("-kernel connect_all false", Config), + ok. + +connect_all_false_test(CAArg, Config) -> Me = self(), {ok, CAf} = start_non_connecting_node(ca_false, Config), + {ok, false} = rpc:call(CAf, application, get_env, [kernel, connect_all]), ok = rpc:call(CAf, error_logger, add_report_handler, [?MODULE, Me]), info = rpc:call(CAf, error_logger, warning_map, []), - {global_name_server,CAf} ! {nodeup, fake_node}, + {global_name_server,CAf} ! {nodeup, fake_node, #{connection_id => 4711}}, timer:sleep(100), stop_node(CAf), - receive {nodeup,fake_node} -> ct:fail({info_report, was, sent}) + receive {nodeup,fake_node, _} -> + ct:fail({info_report, was, sent}) after 1000 -> ok end, ok. @@ -3204,8 +3213,9 @@ global_groups_change(Config) -> Config2 = filename:join(Dir, "sys2"), {ok, CpC} = start_node_boot(NcpC, Config2, dc), - sync_and_wait(CpA), - sync_and_wait(CpD), + gg_sync_and_wait(Cp1, [Cp2], [], [mk_node(Ncp5, M)]), + gg_sync_and_wait(CpA, [CpB], [], []), + gg_sync_and_wait(CpD, [CpC, CpE], [], []), pong = rpc:call(CpA, net_adm, ping, [CpC]), pong = rpc:call(CpC, net_adm, ping, [CpB]), @@ -3345,6 +3355,9 @@ global_groups_change(Config) -> Info1ok -> ok; _ -> + ct:pal("Expected: ~p~n" + "Got : ~p~n", + [Info1ok, Info1]), ct:fail({{"could not change the global groups" " in node", Cp1}, {Info1, Info1ok}}) end, @@ -3353,6 +3366,9 @@ global_groups_change(Config) -> Info2ok -> ok; _ -> + ct:pal("Expected: ~p~n" + "Got : ~p~n", + [Info2ok, Info2]), ct:fail({{"could not change the global groups" " in node", Cp2}, {Info2, Info2ok}}) end, @@ -3361,6 +3377,9 @@ global_groups_change(Config) -> Info3ok -> ok; _ -> + ct:pal("Expected: ~p~n" + "Got : ~p~n", + [Info3ok, Info3]), ct:fail({{"could not change the global groups" " in node", Cp3}, {Info3, Info3ok}}) end, @@ -3369,6 +3388,9 @@ global_groups_change(Config) -> InfoAok -> ok; _ -> + ct:pal("Expected: ~p~n" + "Got : ~p~n", + [InfoAok, InfoA]), ct:fail({{"could not change the global groups" " in node", CpA}, {InfoA, InfoAok}}) end, @@ -3377,6 +3399,9 @@ global_groups_change(Config) -> InfoBok -> ok; _ -> + ct:pal("Expected: ~p~n" + "Got : ~p~n", + [InfoBok, InfoB]), ct:fail({{"could not change the global groups" " in node", CpB}, {InfoB, InfoBok}}) end, @@ -3386,6 +3411,9 @@ global_groups_change(Config) -> InfoCok -> ok; _ -> + ct:pal("Expected: ~p~n" + "Got : ~p~n", + [InfoCok, InfoC]), ct:fail({{"could not change the global groups" " in node", CpC}, {InfoC, InfoCok}}) end, @@ -3394,6 +3422,9 @@ global_groups_change(Config) -> InfoDok -> ok; _ -> + ct:pal("Expected: ~p~n" + "Got : ~p~n", + [InfoDok, InfoD]), ct:fail({{"could not change the global groups" " in node", CpD}, {InfoD, InfoDok}}) end, @@ -3402,6 +3433,9 @@ global_groups_change(Config) -> InfoEok -> ok; _ -> + ct:pal("Expected: ~p~n" + "Got : ~p~n", + [InfoEok, InfoE]), ct:fail({{"could not change the global groups" " in node", CpE}, {InfoE, InfoEok}}) end, @@ -3419,27 +3453,30 @@ global_groups_change(Config) -> init_condition(Config), ok. -sync_and_wait(Node) -> - Ref = make_ref(), - Self = self(), - spawn(Node, fun () -> - global_group:sync(), - case whereis(global_group_check) of - P when is_pid(P) -> - Self ! {Ref, P}; - _ -> - Self ! {Ref, done} - end - end), - receive - {Ref, P} when is_pid(P) -> - MonRef = erlang:monitor(process, P), - receive - {'DOWN',MonRef,process,P,_} -> - ok - end; - {Ref, _} -> - ok +gg_sync_and_wait(Node, Synced, SyncError, NoContact) -> + ok = rpc:call(Node, global_group, sync, []), + gg_wait(Node, Synced, SyncError, NoContact). + +gg_wait(Node, Synced, SyncError, NoContact) -> + receive after 100 -> ok end, + try + GGInfo = rpc:call(Node, global_group, info, []), + ct:pal("GG info: ~p~n", [GGInfo]), + case proplists:lookup(synced_nodes, GGInfo) of + {synced_nodes, Synced} -> ok; + _ -> throw(wait) + end, + case proplists:lookup(sync_error, GGInfo) of + {sync_error, SyncError} -> ok; + _ -> throw(wait) + end, + case proplists:lookup(no_contact, GGInfo) of + {no_contact, NoContact} -> ok; + _ -> throw(wait) + end + catch + throw:wait -> + gg_wait(Node, Synced, SyncError, NoContact) end. %%% Copied from init_SUITE.erl. @@ -4437,7 +4474,7 @@ trace_message(M) -> init(Tester) -> {ok, Tester}. -handle_event({_, _GL, {_Pid,_String,[{nodeup,fake_node}=Msg]}}, Tester) -> +handle_event({_, _GL, {_Pid,_String,[{nodeup,fake_node,_}=Msg]}}, Tester) -> Tester ! Msg, {ok, Tester}; handle_event(_Event, State) -> diff --git a/lib/sasl/test/release_handler_SUITE.erl b/lib/sasl/test/release_handler_SUITE.erl index 5ce4050d30..bceff95d8a 100644 --- a/lib/sasl/test/release_handler_SUITE.erl +++ b/lib/sasl/test/release_handler_SUITE.erl @@ -1814,15 +1814,34 @@ upgrade_gg(Conf) -> %% start gg2 and gg6 [Gg2,Gg6] = start_nodes(Conf,[Gg2Sname,Gg6Sname],"upgrade_gg start gg2/gg6"), + %% Watch dog pulling out some more information in case we hang... + Nodes3 = [Gg1,Gg2,Gg4,Gg5,Gg6], + Tester = self(), + WD = spawn_link(fun () -> + receive after 7*60*1000 -> ok end, + ct:pal("7 minutes passed...~n", []), + erlang:suspend_process(Tester), + load_suite(Nodes3), + dump_info([node()|Nodes3]), + exit(operation_hang) + end), + %% reg proc on each of the nodes ok = rpc:call(Gg2, installer, reg_proc, [reg2]), ok = rpc:call(Gg6, installer, reg_proc, [reg6]), - are_names_reg_gg(Gg1, [reg1, reg2, reg4, reg5, reg6]), %% Check global group info - Nodes3 = [Gg1,Gg2,Gg4,Gg5,Gg6], [check_gg_info(Node,Nodes3,[],Nodes3--[Node]) || Node <- Nodes3], + OkList = lists:map(fun (_) -> ok end, Nodes3), + {OkList,[]} = rpc:multicall(Nodes3, global, sync, []), + + are_names_reg_gg(Gg1, [reg1, reg2, reg4, reg5, reg6]), + + unlink(WD), + exit(WD, kill), + false = is_process_alive(WD), + ok. upgrade_gg(cleanup,Config) -> @@ -1830,6 +1849,53 @@ upgrade_gg(cleanup,Config) -> NodeNames = [node_name(Sname) || Sname <- Snames], ok = stop_nodes(NodeNames). +load_suite(Nodes) -> + {ok,Bin}=file:read_file(code:which(?MODULE)), + _ = rpc:multicall(Nodes, erlang, load_module, [?MODULE, Bin]), + ok. + +dump_info(Nodes) -> + GetLockerState = fun (TheLocker) -> + Mon = erlang:monitor(process, TheLocker), + TheLocker ! {get_state, self(), Mon}, + receive + Msg when element(1, Msg) =:= Mon -> + erlang:demonitor(Mon, [flush]), + RList = tl(erlang:tuple_to_list(Msg)), + erlang:list_to_tuple([state | RList]); + {'DOWN', Mon, process, TheLocker, Reason} -> + {error, Reason} + after 60*1000 -> + erlang:demonitor(Mon, [flush]), + {error, timeout} + end + end, + GI = rpc:multicall(Nodes, + erlang, + apply, + [fun () -> + GlobalLocker = global:get_locker(), + {node(), + #{global_state => global:info(), + global_dict => process_info(whereis(global_name_server), dictionary), + global_locks_tab => ets:tab2list(global_locks), + global_names_tab => ets:tab2list(global_names), + global_names_ext_tab => ets:tab2list(global_names_ext), + global_pid_names_tab => ets:tab2list(global_pid_names), + global_pid_ids_tab => ets:tab2list(global_pid_ids), + global_lost_connections_tab => ets:tab2list(global_lost_connections), + global_node_resources_tag => ets:tab2list(global_node_resources), + global_locker_state => GetLockerState(GlobalLocker), + global_locker_info => process_info(GlobalLocker, + [status, + current_stacktrace, + messages, + dictionary]), + global_group_info => global_group:info()}} + end, + []], + 2*60*1000), + ct:pal("GI: ~p~n", [GI]). %%%----------------------------------------------------------------- %%% OTP-10463, Bug - release_handler could not handle regexp in appup @@ -2583,48 +2649,31 @@ check_gg_info(Node,OtherAlive,OtherDead,Synced) -> check_gg_info(Node,OtherAlive,OtherDead,Synced,N) -> GGI = rpc:call(Node, global_group, info, []), - GI = rpc:call(Node, global, info,[]), - try do_check_gg_info(OtherAlive,OtherDead,Synced,GGI,GI) - catch _:E:Stacktrace when N==0 -> + try do_check_gg_info(OtherAlive,OtherDead,Synced,GGI) + catch _:E:Stacktrace -> ?t:format("~nERROR: check_gg_info failed for ~p:~n~p~n" - "when GGI was: ~p~nand GI was: ~p~n", - [Node,{E,Stacktrace},GGI,GI]), - ?t:fail("check_gg_info failed"); - _:E:Stacktrace -> - ?t:format("~nWARNING: check_gg_info failed for ~p:~n~p~n" - "when GGI was: ~p~nand GI was: ~p~n", - [Node,{E,Stacktrace},GGI,GI]), - timer:sleep(1000), - check_gg_info(Node,OtherAlive,OtherDead,Synced,N-1) + "when GGI was: ~p~n", + [Node,{E,Stacktrace},GGI]), + if N == 0 -> + ?t:fail("check_gg_info failed"); + true -> + ok = rpc:call(Node, global_group, sync, []), + timer:sleep(1000), + check_gg_info(Node,OtherAlive,OtherDead,Synced,N-1) + end end. -do_check_gg_info(OtherAlive,OtherDead,Synced,GGI,GI) -> +do_check_gg_info(OtherAlive,OtherDead,Synced,GGI) -> {_,gg1} = lists:keyfind(own_group_name,1,GGI), {_,synced} = lists:keyfind(state,1,GGI), {_,AllNodes} = lists:keyfind(own_group_nodes,1,GGI), true = lists:sort(AllNodes) =:= lists:sort(OtherAlive++OtherDead), {_,[]} = lists:keyfind(sync_error,1,GGI), {_,[{gg2,[_,_]}]} = lists:keyfind(other_groups,1,GGI), - - %% There is a known bug in global_group (OTP-9177) which causes - %% the following to fail every now and then: - %% {_,SyncedNodes} = lists:keyfind(synced_nodes,1,GGI), - %% true = lists:sort(SyncedNodes) =:= lists:sort(Synced), - %% {_,NoContact} = lists:keyfind(no_contact,1,GGI), - %% true = lists:sort(NoContact) =:= lists:sort(OtherDead), - - %% Therefore we use global:info instead for this part - {state,_,_,SyncedNodes,_,_,_,_,_,_,_} = GI, + {_,SyncedNodes} = lists:keyfind(synced_nodes,1,GGI), true = lists:sort(SyncedNodes) =:= lists:sort(Synced), - - %% .. and we only check that all OtherDead are listed as - %% no_contact (due to th bug there might be more nodes in this - %% list) {_,NoContact} = lists:keyfind(no_contact,1,GGI), - true = - lists:sort(OtherDead) =:= - lists:sort([NC || NC <- NoContact,lists:member(NC,OtherDead)]), - + true = lists:sort(NoContact) =:= lists:sort(OtherDead), ok. %% Return the configuration (to be inserted in sys.config) for global group tests diff --git a/lib/stdlib/src/erl_internal.erl b/lib/stdlib/src/erl_internal.erl index 6ff5e23ee3..fb18721359 100644 --- a/lib/stdlib/src/erl_internal.erl +++ b/lib/stdlib/src/erl_internal.erl @@ -352,6 +352,7 @@ bif(node, 0) -> true; bif(node, 1) -> true; bif(nodes, 0) -> true; bif(nodes, 1) -> true; +bif(nodes, 2) -> true; bif(now, 0) -> true; bif(open_port, 2) -> true; bif(pid_to_list, 1) -> true; diff --git a/lib/stdlib/src/stdlib.app.src b/lib/stdlib/src/stdlib.app.src index b59e3b28c0..1af985250a 100644 --- a/lib/stdlib/src/stdlib.app.src +++ b/lib/stdlib/src/stdlib.app.src @@ -109,6 +109,6 @@ dets]}, {applications, [kernel]}, {env, []}, - {runtime_dependencies, ["sasl-3.0","kernel-7.0","erts-11.0","crypto-3.3", + {runtime_dependencies, ["sasl-3.0","kernel-7.0","erts-@OTP-17934@","crypto-3.3", "compiler-5.0"]} ]}. |