diff options
29 files changed, 1569 insertions, 671 deletions
diff --git a/erts/doc/src/erl_dist_protocol.xml b/erts/doc/src/erl_dist_protocol.xml index a54f9dca27..eeb0049f4f 100644 --- a/erts/doc/src/erl_dist_protocol.xml +++ b/erts/doc/src/erl_dist_protocol.xml @@ -155,13 +155,13 @@ </item> <tag><c>HighestVersion</c></tag> <item> - <p>The highest distribution version that this node can handle. - The value in OTP 23 and later is 6.</p> + <p>The highest distribution protocol version this node can handle. + The value in OTP 23 and later is 6. Older nodes only support version 5.</p> </item> <tag><c>LowestVersion</c></tag> <item> <p>The lowest distribution version that this node can handle. - The value in OTP 23 and later is 5.</p> + Should be 5 to support connections to nodes older than OTP 23.</p> </item> <tag><c>Nlen</c></tag> <item> @@ -187,7 +187,7 @@ <p>The response message is either <c>ALIVE2_X_RESP</c> or <c>ALIVE2_RESP</c> depending on distribution version. If both the node - and EPMD support distribution version 6 then response is + and EPMD support distribution version 6 then the response is <c>ALIVE2_X_RESP</c> otherwise it is the older <c>ALIVE2_RESP</c>:</p> <table align="left"> @@ -549,8 +549,14 @@ io:format("old/unused name ~ts at port ~p, fd = ~p ~n", <section> <marker id="distribution_handshake"/> <title>Distribution Handshake</title> - <p>This section describes the distribution handshake protocol introduced - in Erlang/OTP R6. The handshake has remained almost the same since then.</p> + <p> + This section describes the distribution handshake protocol used between + nodes to establishing a connection. The protocol was introduced in + Erlang/OTP R6 and has remained unchanged until OTP 23. The changes made in + OTP 23 were designed to be compatible with the older protocol + version. That is an old node can still connect toward a new node and vice + versa. + </p> <section> <title>General</title> @@ -617,19 +623,68 @@ io:format("old/unused name ~ts at port ~p, fd = ~p ~n", <tag>2) <c>send_name</c>/<c>receive_name</c></tag> <item> <p><c>A</c> sends an initial identification to <c>B</c>, which - receives the message. The message looks as follows (every "square" - is one byte and the packet header is removed):</p> - <pre> -+---+--------+--------+-----+-----+-----+-----+-----+-----+-...-+-----+ -|'n'|Version0|Version1|Flag0|Flag1|Flag2|Flag3|Name0|Name1| ... |NameN| -+---+--------+--------+-----+-----+-----+-----+-----+-----+-... +-----+</pre> - <p>'n' is the message tag. 'Version0' and 'Version1' is the - distribution version selected by <c>A</c>, based on information - from the EPMD. (16-bit big-endian) 'Flag0' ... 'Flag3' are - capability flags, the capabilities are defined in - <c>$ERL_TOP/lib/kernel/include/dist.hrl</c>. (32-bit big-endian) - 'Name0' ... 'NameN' is the full node name of <c>A</c>, as a string - of bytes (the packet length denotes how long it is).</p> + receives the message. The message can have two different formats + which looks as follows (the packet headers are removed): + </p> + <table align="left"> + <row> + <cell align="center">1</cell> + <cell align="center">2</cell> + <cell align="center">4</cell> + <cell align="center">Nlen</cell> + </row> + <row> + <cell align="center"><c>'n'</c></cell> + <cell align="center"><c>Version=5</c></cell> + <cell align="center"><c>Flags</c></cell> + <cell align="center"><c>Name</c></cell> + </row> + <tcaption>Old send_name ('n') for protocol version 5</tcaption> + </table> + + <table align="left"> + <row> + <cell align="center">1</cell> + <cell align="center">8</cell> + <cell align="center">4</cell> + <cell align="center">2</cell> + <cell align="center">Nlen</cell> + </row> + <row> + <cell align="center"><c>'N'</c></cell> + <cell align="center"><c>Flags</c></cell> + <cell align="center"><c>Creation</c></cell> + <cell align="center"><c>Nlen</c></cell> + <cell align="center"><c>Name</c></cell> + </row> + <tcaption>New send_name ('N') for protocol version 6</tcaption> + </table> + + <p> + The old <c>send_name</c> format is sent from nodes only supporting version 5 + or to nodes that might only support version 5. The <c>Version</c> is + a 16-bit big endian integer and <em>must</em> always have the value 5, even + if node <c>A</c> supports version 6. <c>Flags</c> are the + <seealso marker="#dflags">capability flags</seealso> + of node <c>A</c> in 32-bit big endian. The flag bit + <seealso marker="#DFLAG_HANDSHAKE_23"><c>DFLAG_HANDSHAKE_23</c></seealso> + should be set if node <c>A</c> supports version 6. + <c>Name</c> is the full node name of <c>A</c>, as a string of bytes + (the packet length denotes how long it is). + </p> + <p> + The new <c>send_name</c> is only sent from nodes supporting version 6 to + nodes known to support version 6. <c>Flags</c> are the + <seealso marker="#dflags">capability flags</seealso> of node + <c>A</c> in 64-bit big endian. The flag bit + <seealso marker="#DFLAG_HANDSHAKE_23"><c>DFLAG_HANDSHAKE_23</c></seealso> + must always be set. <c>Creation</c> is the node incarnation + identifier used by node <c>A</c> to create its pids, ports and + references. <c>Name</c> is the full node name of <c>A</c>, as a + string of bytes. <c>Nlen</c> is the byte length of the node name in + 16-bit big endian. Any extra data after the node <c>Name</c> must be + accepted and ignored. + </p> </item> <tag>3) <c>recv_status</c>/<c>send_status</c></tag> <item> @@ -666,13 +721,19 @@ io:format("old/unused name ~ts at port ~p, fd = ~p ~n", node <c>B</c>. See step 3B below.</p> </item> </taglist> - <p>The format of the status message is as follows:</p> - <pre> -+---+-------+-------+-...-+-------+ -|'s'|Status0|Status1| ... |StatusN| -+---+-------+-------+-...-+-------+</pre> - <p>'s' is the message tag. 'Status0' ... 'StatusN' is the status as a - string (not terminated).</p> + <table align="left"> + <row> + <cell align="center">1</cell> + <cell align="center">Slen</cell> + </row> + <row> + <cell align="center"><c>'s'</c></cell> + <cell align="center"><c>Status</c></cell> + </row> + <tcaption>The format of the status message</tcaption> + </table> + <p>'s' is the message tag. <c>Status</c> is the status as a + string (not null terminated).</p> </item> <tag>3B) <c>send_status</c>/<c>recv_status</c></tag> <item> @@ -688,39 +749,136 @@ io:format("old/unused name ~ts at port ~p, fd = ~p ~n", handshake continues with <c>B</c> sending <c>A</c> another message, the challenge. The challenge contains the same type of information as the "name" message initially sent from <c>A</c> to <c>B</c>, plus - a 32-bit challenge:</p> - <pre> -+---+--------+--------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-...-+-----+ -|'n'|Version0|Version1|Flag0|Flag1|Flag2|Flag3|Chal0|Chal1|Chal2|Chal3|Name0|Name1| ... |NameN| -+---+--------+--------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-... +-----+</pre> - <p>'Chal0' ... 'Chal3' is the challenge as a 32-bit big-endian integer - and the other fields are <c>B</c>'s version, flags, and full node - name.</p> + a 32-bit challenge. The challenge message can have two different + formats: + </p> + <table align="left"> + <row> + <cell align="center">1</cell> + <cell align="center">2</cell> + <cell align="center">4</cell> + <cell align="center">4</cell> + <cell align="center">Nlen</cell> + </row> + <row> + <cell align="center"><c>'n'</c></cell> + <cell align="center"><c>Version=5</c></cell> + <cell align="center"><c>Flags</c></cell> + <cell align="center"><c>Challenge</c></cell> + <cell align="center"><c>Name</c></cell> + </row> + <tcaption>The old challenge message format (version 5)</tcaption> + </table> + <table align="left"> + <row> + <cell align="center">1</cell> + <cell align="center">8</cell> + <cell align="center">4</cell> + <cell align="center">4</cell> + <cell align="center">2</cell> + <cell align="center">Nlen</cell> + </row> + <row> + <cell align="center"><c>'N'</c></cell> + <cell align="center"><c>Flags</c></cell> + <cell align="center"><c>Challenge</c></cell> + <cell align="center"><c>Creation</c></cell> + <cell align="center"><c>Nlen</c></cell> + <cell align="center"><c>Name</c></cell> + </row> + <tcaption>The new challenge message format (version 6)</tcaption> + </table> + <p> + The old challenge message is sent from old <c>B</c> nodes + (supporting only version 5) or if node <c>A</c> had not capability flag + <seealso marker="#DFLAG_HANDSHAKE_23"><c>DFLAG_HANDSHAKE_23</c></seealso> + set. The <c>Version</c> is a 16-bit big endian integer and + <c>must</c> always have the value 5. + </p> + <p> + The new challenge message is sent from new <c>B</c> nodes if node + <c>A</c> had capability flag <seealso marker="#DFLAG_HANDSHAKE_23"> + <c>DFLAG_HANDSHAKE_23</c></seealso> set. Any extra data after the + node <c>Name</c> must be accepted and ignored. + </p> + <p> + <c>Challenge</c> is a 32-bit big-endian integer. The other fields + are node <c>B</c>'s flags, creation and full node name, similar to + the <c>send_name</c> message. + </p> + </item> + + <tag>4B) <c>send_complement</c>/<c>recv_complement</c></tag> + <item> + <p> + The complement message, from <c>A</c> to <c>B</c>, is only sent if + node <c>A</c> initially sent an old name message and received back a + new challenge message from node <c>B</c>. It contains complementary + information missing in the initial old name message from node <c>A</c>. + </p> + <table align="left"> + <row> + <cell align="center">1</cell> + <cell align="center">4</cell> + <cell align="center">4</cell> + </row> + <row> + <cell align="center"><c>'c'</c></cell> + <cell align="center"><c>FlagsHigh</c></cell> + <cell align="center"><c>Creation</c></cell> + </row> + <tcaption>The complement message</tcaption> + </table> + <p> + <c>FlagsHigh</c> are the high capability flags (bit 33-64) of node + <c>A</c> as a 32-bit big endian integer. <c>Creation</c> is the + incarnation identifier of node <c>A</c>. + </p> </item> + <tag>5) <c>send_challenge_reply</c>/<c>recv_challenge_reply</c></tag> <item> <p>Now <c>A</c> has generated a digest and its own challenge. Those are sent together in a package to <c>B</c>:</p> - <pre> -+---+-----+-----+-----+-----+-----+-----+-----+-----+-...-+------+ -|'r'|Chal0|Chal1|Chal2|Chal3|Dige0|Dige1|Dige2|Dige3| ... |Dige15| -+---+-----+-----+-----+-----+-----+-----+-----+-----+-...-+------+</pre> - <p>'r' is the tag. 'Chal0' ... 'Chal3' is <c>A</c>'s challenge for - <c>B</c> to handle. 'Dige0' ... 'Dige15' is the digest that <c>A</c> - constructed from the challenge <c>B</c> sent in the previous - step.</p> + <table align="left"> + <row> + <cell align="center">1</cell> + <cell align="center">4</cell> + <cell align="center">16</cell> + </row> + <row> + <cell align="center"><c>'r'</c></cell> + <cell align="center"><c>Challenge</c></cell> + <cell align="center"><c>Digest</c></cell> + </row> + <tcaption>The challenge_reply message</tcaption> + </table> + <p> + <c>Challenge</c> is <c>A</c>'s challenge for <c>B</c> to + handle. <c>Digest</c> is the MD5 digest that <c>A</c> constructed + from the challenge <c>B</c> sent in the previous step. + </p> </item> <tag>6) <c>recv_challenge_ack</c>/<c>send_challenge_ack</c></tag> <item> <p><c>B</c> checks that the digest received from <c>A</c> is correct and generates a digest from the challenge received from <c>A</c>. The digest is then sent to <c>A</c>. The message is as follows:</p> - <pre> -+---+-----+-----+-----+-----+-...-+------+ -|'a'|Dige0|Dige1|Dige2|Dige3| ... |Dige15| -+---+-----+-----+-----+-----+-...-+------+</pre> - <p>'a' is the tag. 'Dige0' ... 'Dige15' is the digest calculated by - <c>B</c> for <c>A</c>'s challenge.</p> + <table align="left"> + <row> + <cell align="center">1</cell> + <cell align="center">16</cell> + </row> + <row> + <cell align="center"><c>'a'</c></cell> + <cell align="center"><c>Digest</c></cell> + </row> + <tcaption>The challenge_ack message</tcaption> + </table> + <p> + <c>Digest</c> is the digest calculated by <c>B</c> for <c>A</c>'s + challenge. + </p> </item> <tag>7) check</tag> <item> @@ -746,10 +904,15 @@ recv_status (if status was 'alive' send_status - - - - - - - - - - - - - - - - - -> recv_status) - ChB = gen_challenge() - (ChB) + + (ChB) ChB = gen_challenge() <---------------------------------------------- send_challenge recv_challenge + +(if old send_name and new recv_challenge + send_complement - - - - - - - - - - - - - - - -> + recv_complement) + ChA = gen_challenge(), OCA = out_cookie(B), DiA = gen_digest(ChB, OCA) @@ -898,7 +1061,13 @@ DiB == gen_digest(ChA, ICA)? <p>Use <seealso marker="erl_ext_dist#fragments">fragmented</seealso> distribution messages to send large messages.</p> </item> - <tag><marker id="DFLAG_SPAWN"/><c>-define(DFLAG_SPAWN, 16#1000000).</c></tag> + <marker id="DFLAG_HANDSHAKE_23"/> + <tag><c>-define(DFLAG_HANDSHAKE_23, 16#1000000).</c></tag> + <item> + <p>The node supports the new connection setup handshake (version 6) + introduced in OTP 23.</p> + </item> + <tag><marker id="DFLAG_SPAWN"/><c>-define(DFLAG_SPAWN, 16#100000000).</c></tag> <item> <p>Set if the <seealso marker="#SPAWN_REQUEST"><c>SPAWN_REQUEST</c></seealso>, <seealso marker="#SPAWN_REQUEST_TT"><c>SPAWN_REQUEST_TT</c></seealso>, @@ -1055,12 +1224,6 @@ DiB == gen_digest(ChA, ICA)? <p><c>{8, FromPid, ToPid, Reason}</c></p> <p>This signal is sent by a call to the erlang:exit/2 bif</p> </item> - </taglist> - </section> - - <section> - <title>New Ctrlmessages for distrvsn = 1 (Erlang/OTP R4)</title> - <taglist> <tag><c>SEND_TT</c></tag> <item> <p><c>{12, Unused, ToPid, TraceToken}</c></p> @@ -1081,24 +1244,6 @@ DiB == gen_digest(ChA, ICA)? <item> <p><c>{18, FromPid, ToPid, TraceToken, Reason}</c></p> </item> - </taglist> - </section> - - <section> - <title>New Ctrlmessages for distrvsn = 2</title> - <p><c>distrvsn</c> 2 was never used.</p> - </section> - - <section> - <title>New Ctrlmessages for distrvsn = 3 (Erlang/OTP R5C)</title> - <p>None, but the version number was increased anyway.</p> - </section> - - <section> - <title>New Ctrlmessages for distrvsn = 4 (Erlang/OTP R6)</title> - <p>These are only recognized by Erlang nodes, not by hidden nodes.</p> - - <taglist> <tag><c>MONITOR_P</c></tag> <item> <p><c>{19, FromPid, ToProc, Ref}</c>, where diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 7e36cf925a..3c52716fae 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -196,7 +196,7 @@ bif erts_internal:scheduler_wall_time/1 bif erts_internal:dirty_process_handle_signals/1 -bif erts_internal:create_dist_channel/4 +bif erts_internal:create_dist_channel/3 bif erts_internal:ets_super_user/1 @@ -764,3 +764,9 @@ bif erts_internal:ets_raw_next/2 bif erts_internal:abort_pending_connection/2 + +# +# New in 23 +# + +bif erts_internal:get_creation/0 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index f411da3f36..03e2e0b116 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -961,7 +961,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) ErtsAtomCache *cache; ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; - Uint32 flags; + Uint64 flags; erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1); erts_de_rwlock(dep); @@ -991,7 +991,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) dep->sequences = NULL; nodename = dep->sysname; - flags = dep->flags; + flags = dep->dflags; erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) NIL); cache = dep->cache; @@ -1081,14 +1081,27 @@ void init_dist(void) am_erts_internal, am_spawn_request_yield, 3, spawn_request_yield_3); { - Eterm* hp = erts_alloc(ERTS_ALC_T_LITERAL, (1+6)*sizeof(Eterm)); - erts_dflags_record = TUPLE6(hp, am_erts_dflags, - make_small(DFLAG_DIST_DEFAULT), - make_small(DFLAG_DIST_MANDATORY), - make_small(DFLAG_DIST_ADDABLE), - make_small(DFLAG_DIST_REJECTABLE), - make_small(DFLAG_DIST_STRICT_ORDER)); - erts_set_literal_tag(&erts_dflags_record, hp, (1+6)); + Eterm *hp_start, *hp, **hpp = NULL; + Uint sz = 0, *szp = &sz; + while (1) { + erts_dflags_record = + erts_bld_tuple(hpp, szp, 6, + am_erts_dflags, + erts_bld_uint64(hpp, szp, DFLAG_DIST_DEFAULT), + erts_bld_uint64(hpp, szp, DFLAG_DIST_MANDATORY), + erts_bld_uint64(hpp, szp, DFLAG_DIST_ADDABLE), + erts_bld_uint64(hpp, szp, DFLAG_DIST_REJECTABLE), + erts_bld_uint64(hpp, szp, DFLAG_DIST_STRICT_ORDER)); + if (hpp) { + ASSERT(is_value(erts_dflags_record)); + ASSERT(hp == hp_start + sz); + erts_set_literal_tag(&erts_dflags_record, hp_start, sz); + break; + } + hp = hp_start = erts_alloc(ERTS_ALC_T_LITERAL, sz*sizeof(Eterm)); + hpp = &hp; + szp = NULL; + } } } @@ -1305,14 +1318,14 @@ erts_dsig_send_m_exit(ErtsDSigSendContext *ctx, Eterm watcher, Eterm watched, { Eterm ctl, msg; - if (~ctx->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~ctx->dflags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor) */ return ERTS_DSIG_SEND_OK; } - if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) { + if (ctx->dep->dflags & DFLAG_EXIT_PAYLOAD) { ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_MONITOR_P_EXIT), watched, watcher, ref); msg = reason; @@ -1334,7 +1347,7 @@ erts_dsig_send_monitor(ErtsDSigSendContext *ctx, Eterm watcher, Eterm watched, { Eterm ctl; - if (~ctx->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~ctx->dflags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_MONITOR_P. * Just avoid sending it and by doing that reduce this monitor @@ -1360,7 +1373,7 @@ erts_dsig_send_demonitor(ErtsDSigSendContext *ctx, Eterm watcher, { Eterm ctl; - if (~ctx->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~ctx->dflags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor) */ @@ -1377,7 +1390,7 @@ erts_dsig_send_demonitor(ErtsDSigSendContext *ctx, Eterm watcher, static int can_send_seqtrace_token(ErtsDSigSendContext* ctx, Eterm token) { Eterm label; - if (ctx->flags & DFLAG_BIG_SEQTRACE_LABELS) { + if (ctx->dflags & DFLAG_BIG_SEQTRACE_LABELS) { /* The other end is capable of handling arbitrary seq_trace labels. */ return 1; } @@ -1438,7 +1451,7 @@ erts_dsig_send_msg(ErtsDSigSendContext* ctx, Eterm remote, Eterm message) send_token = (token != NIL && can_send_seqtrace_token(ctx, token)); - if (ctx->flags & DFLAG_SEND_SENDER) { + if (ctx->dflags & DFLAG_SEND_SENDER) { dist_op = make_small(send_token ? DOP_SEND_SENDER_TT : DOP_SEND_SENDER); @@ -1540,20 +1553,20 @@ erts_dsig_send_exit_tt(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, DTRACE_CHARBUF(reason_str, 128); #endif - if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) + if (ctx->dep->dflags & DFLAG_EXIT_PAYLOAD) msg = reason; if (have_seqtrace(token)) { seq_trace_update_serial(ctx->c_p); seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local); - if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) { + if (ctx->dep->dflags & DFLAG_EXIT_PAYLOAD) { ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT_TT), local, remote, token); } else ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_EXIT_TT), local, remote, token, reason); } else { - if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) + if (ctx->dep->dflags & DFLAG_EXIT_PAYLOAD) ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote); else ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); @@ -1584,9 +1597,9 @@ erts_dsig_send_exit_tt(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, int erts_dsig_send_exit(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Eterm reason) { - Eterm ctl, msg = ctx->dep->flags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE; + Eterm ctl, msg = ctx->dep->dflags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE; - if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) { + if (ctx->dep->dflags & DFLAG_EXIT_PAYLOAD) { ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote); msg = reason; } else { @@ -1601,7 +1614,7 @@ erts_dsig_send_exit2(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Eterm { Eterm ctl, msg; - if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) { + if (ctx->dep->dflags & DFLAG_EXIT_PAYLOAD) { ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT2), local, remote); msg = reason; @@ -2249,7 +2262,7 @@ int erts_net_message(Port *prt, * the atom '' (empty cookie). */ ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT) - ? (is_pid(tuple[2]) && (dep->flags & DFLAG_SEND_SENDER)) + ? (is_pid(tuple[2]) && (dep->dflags & DFLAG_SEND_SENDER)) : tuple[2] == am_Empty); #ifdef ERTS_DIST_MSG_DBG @@ -2859,7 +2872,7 @@ retry: ctx->connection_id = dep->connection_id; ctx->no_suspend = no_suspend; ctx->no_trap = no_trap; - ctx->flags = dep->flags; + ctx->dflags = dep->dflags; ctx->return_term = am_true; ctx->phase = ERTS_DSIG_SEND_PHASE_INIT; ctx->from = proc ? proc->common.id : am_undefined; @@ -2926,7 +2939,7 @@ erts_dsig_send(ErtsDSigSendContext *ctx) if (!erts_is_alive) return ERTS_DSIG_SEND_OK; - if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { + if (ctx->dflags & DFLAG_DIST_HDR_ATOM_CACHE) { ctx->acmp = erts_get_atom_cache_map(ctx->c_p); } else { @@ -2944,7 +2957,7 @@ erts_dsig_send(ErtsDSigSendContext *ctx) ctx->data_size = 0; erts_reset_atom_cache_map(ctx->acmp); - ERTS_INIT_TTBSizeContext(&ctx->u.sc, ctx->flags); + ERTS_INIT_TTBSizeContext(&ctx->u.sc, ctx->dflags); while (1) { ErtsExtSzRes sz_res; @@ -3010,9 +3023,9 @@ erts_dsig_send(ErtsDSigSendContext *ctx) } case ERTS_DSIG_SEND_PHASE_ALLOC: { - erts_finalize_atom_cache_map(ctx->acmp, ctx->flags); + erts_finalize_atom_cache_map(ctx->acmp, ctx->dflags); - ERTS_INIT_TTBEncodeContext(&ctx->u.ec, ctx->flags); + ERTS_INIT_TTBEncodeContext(&ctx->u.ec, ctx->dflags); ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(&ctx->u.ec, ctx->acmp, ctx->fragments); @@ -3037,7 +3050,7 @@ erts_dsig_send(ErtsDSigSendContext *ctx) Sint reds = CONTEXT_REDS; /* Encode control message */ int res = erts_encode_dist_ext(ctx->ctl, &ctx->extp, - ctx->flags, ctx->acmp, + ctx->dflags, ctx->acmp, &ctx->u.ec, &ctx->fragments, &reds); ctx->reds -= CONTEXT_REDS - reds; @@ -3062,7 +3075,7 @@ erts_dsig_send(ErtsDSigSendContext *ctx) } while (1) { int res = erts_encode_dist_ext(ctx->msg, &ctx->extp, - ctx->flags, ctx->acmp, + ctx->dflags, ctx->acmp, &ctx->u.ec, &ctx->fragments, redsp); @@ -3493,7 +3506,7 @@ erts_dist_command(Port *prt, int initial_reds) { Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; enum dist_entry_state state; - Uint32 flags; + Uint64 flags; Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); @@ -3506,7 +3519,7 @@ erts_dist_command(Port *prt, int initial_reds) erts_atomic_set_mb(&dep->dist_cmd_scheduled, 0); erts_de_rlock(dep); - flags = dep->flags; + flags = dep->dflags; state = dep->state; send = dep->send; erts_de_runlock(dep); @@ -4131,7 +4144,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) obuf = dep->tmp_out_queue.first; obufsize += size_obuf(obuf); - reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->dflags, reds); obufsize -= size_obuf(obuf); if (reds < 0) { /* finalize needs to be restarted... */ erts_de_runlock(dep); @@ -4528,6 +4541,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) if (success) { inc_no_nodes(); erts_set_this_node(BIF_ARG_1, (Uint32) creation); + erts_this_dist_entry->creation = creation; erts_is_alive = 1; send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL); erts_proc_lock(net_kernel, ERTS_PROC_LOCKS_ALL); @@ -4569,7 +4583,8 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) typedef struct { DistEntry *dep; int de_locked; - Uint flags; + Uint64 dflags; + Uint32 creation; Uint version; Eterm setup_pid; Process *net_kernel; @@ -4577,24 +4592,26 @@ typedef struct { static int setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, - Eterm ctrlr, Uint flags, - Uint version, Eterm setup_pid, + Eterm ctrlr, Uint64 flags, + Uint32 creation, Eterm setup_pid, Process *net_kernel); static Eterm setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp); -BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) +BIF_RETTYPE erts_internal_create_dist_channel_3(BIF_ALIST_3) { BIF_RETTYPE ret; - Uint flags; + Uint64 flags; Uint version; + Uint32 creation; Eterm *hp, res_tag = THE_NON_VALUE, res = THE_NON_VALUE; DistEntry *dep = NULL; int de_locked = 0; Port *pp = NULL; int true_nk; + Eterm *tpl; Process *net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, am_net_kernel, ERTS_PROC_LOCK_STATUS, @@ -4620,19 +4637,27 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) if (!is_internal_port(BIF_ARG_2) && !is_internal_pid(BIF_ARG_2)) goto badarg; + if (!is_tuple_arity(BIF_ARG_3, 3)) + goto badarg; + + tpl = tuple_val(BIF_ARG_3); + /* Dist flags... */ - if (!is_small(BIF_ARG_3)) + if (!term_to_Uint64(tpl[1], &flags)) goto badarg; - flags = unsigned_val(BIF_ARG_3); /* Version... */ - if (!is_small(BIF_ARG_4)) + if (!is_small(tpl[2])) goto badarg; - version = unsigned_val(BIF_ARG_4); + version = unsigned_val(tpl[2]); if (version == 0) goto badarg; + /* Creation... */ + if (!term_to_Uint32(tpl[3], &creation)) + goto badarg; + if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); @@ -4673,7 +4698,8 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) scdc.dep = dep; scdc.de_locked = 1; - scdc.flags = flags; + scdc.dflags = flags; + scdc.creation = creation; scdc.version = version; scdc.setup_pid = BIF_P->common.id; scdc.net_kernel = net_kernel; @@ -4702,7 +4728,8 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) scdcp->dep = dep; scdcp->de_locked = 0; - scdcp->flags = flags; + scdcp->dflags = flags; + scdcp->creation = creation; scdcp->version = version; scdcp->setup_pid = BIF_P->common.id; scdcp->net_kernel = net_kernel; @@ -4777,7 +4804,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) conn_id = dep->connection_id; set_res = setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, - version, BIF_P->common.id, + creation, BIF_P->common.id, net_kernel); /* Dec of refc on net_kernel by setup_connection_epiloge_rwunlock() */ net_kernel = NULL; @@ -4823,9 +4850,9 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) dep->suspended_nodeup = BIF_P; erts_proc_inc_refc(BIF_P); erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - ERTS_BIF_PREP_YIELD4(ret, - &bif_trap_export[BIF_erts_internal_create_dist_channel_4], - BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4); + ERTS_BIF_PREP_YIELD3(ret, + &bif_trap_export[BIF_erts_internal_create_dist_channel_3], + BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); goto done; badarg: @@ -4839,8 +4866,8 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) static int setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, - Eterm ctrlr, Uint flags, - Uint version, Eterm setup_pid, + Eterm ctrlr, Uint64 flags, + Uint32 creation, Eterm setup_pid, Process *net_kernel) { Eterm notify_proc = NIL; @@ -4869,8 +4896,7 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, if (!success) return 0; - dep->version = version; - dep->creation = 0; + dep->creation = creation; ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr)); ASSERT(dep->state == ERTS_DE_STATE_PENDING); @@ -4956,7 +4982,7 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment * *redsp = 5; if (!setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id, - scdcp->flags, scdcp->version, + scdcp->dflags, scdcp->creation, scdcp->setup_pid, scdcp->net_kernel)) { erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); @@ -4999,18 +5025,40 @@ BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0) { if (erts_dflags_test_remove_hopefull_flags) { /* For internal emulator tests only! */ - Eterm *hp = HAlloc(BIF_P, 1+6); - return TUPLE6(hp, am_erts_dflags, - make_small(DFLAG_DIST_DEFAULT & ~DFLAG_DIST_HOPEFULLY), - make_small(DFLAG_DIST_MANDATORY & ~DFLAG_DIST_HOPEFULLY), - make_small(DFLAG_DIST_ADDABLE & ~DFLAG_DIST_HOPEFULLY), - make_small(DFLAG_DIST_REJECTABLE & ~DFLAG_DIST_HOPEFULLY), - make_small(DFLAG_DIST_STRICT_ORDER & ~DFLAG_DIST_HOPEFULLY)); + Eterm *hp, **hpp = NULL; + Uint sz = 0, *szp = &sz; + Eterm res; + while (1) { + res = erts_bld_tuple(hpp, szp, 6, + am_erts_dflags, + erts_bld_uint64(hpp, szp, DFLAG_DIST_DEFAULT & ~DFLAG_DIST_HOPEFULLY), + erts_bld_uint64(hpp, szp, DFLAG_DIST_MANDATORY & ~DFLAG_DIST_HOPEFULLY), + erts_bld_uint64(hpp, szp, DFLAG_DIST_ADDABLE & ~DFLAG_DIST_HOPEFULLY), + erts_bld_uint64(hpp, szp, DFLAG_DIST_REJECTABLE & ~DFLAG_DIST_HOPEFULLY), + erts_bld_uint64(hpp, szp, DFLAG_DIST_STRICT_ORDER & ~DFLAG_DIST_HOPEFULLY)); + if (hpp) { + ASSERT(is_value(res)); + return res; + } + hp = HAlloc(BIF_P, sz); + hpp = &hp; + szp = NULL; + } } return erts_dflags_record; } +BIF_RETTYPE erts_internal_get_creation_0(BIF_ALIST_0) +{ + Eterm *hp; + Uint hsz = 0; + + erts_bld_uint(NULL, &hsz, erts_this_dist_entry->creation); + hp = HAlloc(BIF_P, hsz); + return erts_bld_uint(&hp, NULL, erts_this_dist_entry->creation); +} + BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) { DistEntry* dep; @@ -5432,7 +5480,7 @@ BIF_RETTYPE erts_internal_dist_spawn_request_4(BIF_ALIST_4) goto noconnection; case ERTS_DSIG_PREP_CONNECTED: - if (!(dep->flags & DFLAG_SPAWN)) { + if (!(dep->dflags & DFLAG_SPAWN)) { erts_de_runlock(dep); goto notsup; } @@ -5896,7 +5944,7 @@ BIF_RETTYPE monitor_node_2(BIF_ALIST_2) BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1) { DistEntry *de; - Uint32 f; + Uint64 f; if (is_not_pid(BIF_ARG_1)) { BIF_ERROR(BIF_P,BADARG); } @@ -5906,7 +5954,7 @@ BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1) BIF_RET(am_true); } erts_de_rlock(de); - f = de->flags; + f = de->dflags; erts_de_runlock(de); BIF_RET(((f & DFLAG_UNICODE_IO) ? am_true : am_false)); } diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index b0fbfe96f9..65c29caeb3 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -25,31 +25,39 @@ #include "erl_node_tables.h" #include "zlib.h" -#define DFLAG_PUBLISHED 0x01 -#define DFLAG_ATOM_CACHE 0x02 -#define DFLAG_EXTENDED_REFERENCES 0x04 -#define DFLAG_DIST_MONITOR 0x08 -#define DFLAG_FUN_TAGS 0x10 -#define DFLAG_DIST_MONITOR_NAME 0x20 -#define DFLAG_HIDDEN_ATOM_CACHE 0x40 -#define DFLAG_NEW_FUN_TAGS 0x80 -#define DFLAG_EXTENDED_PIDS_PORTS 0x100 -#define DFLAG_EXPORT_PTR_TAG 0x200 -#define DFLAG_BIT_BINARIES 0x400 -#define DFLAG_NEW_FLOATS 0x800 -#define DFLAG_UNICODE_IO 0x1000 -#define DFLAG_DIST_HDR_ATOM_CACHE 0x2000 -#define DFLAG_SMALL_ATOM_TAGS 0x4000 -#define DFLAG_INTERNAL_TAGS 0x8000 /* used by ETS 'compressed' option */ -#define DFLAG_UTF8_ATOMS 0x10000 -#define DFLAG_MAP_TAG 0x20000 -#define DFLAG_BIG_CREATION 0x40000 -#define DFLAG_SEND_SENDER 0x80000 -#define DFLAG_BIG_SEQTRACE_LABELS 0x100000 -#define DFLAG_PENDING_CONNECT 0x200000 /* internal for pending connection */ -#define DFLAG_EXIT_PAYLOAD 0x400000 -#define DFLAG_FRAGMENTS 0x800000 -#define DFLAG_SPAWN 0x1000000 +#define DFLAG_PUBLISHED ((Uint64)0x01) +#define DFLAG_ATOM_CACHE ((Uint64)0x02) +#define DFLAG_EXTENDED_REFERENCES ((Uint64)0x04) +#define DFLAG_DIST_MONITOR ((Uint64)0x08) +#define DFLAG_FUN_TAGS ((Uint64)0x10) +#define DFLAG_DIST_MONITOR_NAME ((Uint64)0x20) +#define DFLAG_HIDDEN_ATOM_CACHE ((Uint64)0x40) +#define DFLAG_NEW_FUN_TAGS ((Uint64)0x80) +#define DFLAG_EXTENDED_PIDS_PORTS ((Uint64)0x100) +#define DFLAG_EXPORT_PTR_TAG ((Uint64)0x200) +#define DFLAG_BIT_BINARIES ((Uint64)0x400) +#define DFLAG_NEW_FLOATS ((Uint64)0x800) +#define DFLAG_UNICODE_IO ((Uint64)0x1000) +#define DFLAG_DIST_HDR_ATOM_CACHE ((Uint64)0x2000) +#define DFLAG_SMALL_ATOM_TAGS ((Uint64)0x4000) +#define DFLAG_ETS_COMPRESSED ((Uint64)0x8000) /* internal */ +#define DFLAG_UTF8_ATOMS ((Uint64)0x10000) +#define DFLAG_MAP_TAG ((Uint64)0x20000) +#define DFLAG_BIG_CREATION ((Uint64)0x40000) +#define DFLAG_SEND_SENDER ((Uint64)0x80000) +#define DFLAG_BIG_SEQTRACE_LABELS ((Uint64)0x100000) +#define DFLAG_PENDING_CONNECT ((Uint64)0x200000) /* internal */ +#define DFLAG_EXIT_PAYLOAD ((Uint64)0x400000) +#define DFLAG_FRAGMENTS ((Uint64)0x800000) +#define DFLAG_HANDSHAKE_23 ((Uint64)0x1000000) +#define DFLAG_RESERVED 0xfe000000 +/* + * As the old handshake only support 32 flag bits, we reserve the remainding + * bits in the lower 32 for changes in the handshake protocol or potentially + * new capabilities that we also want to backport to OTP-22 or older. + */ +#define DFLAG_SPAWN ((Uint64)0x100000000) + /* Mandatory flags for distribution */ #define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ @@ -82,6 +90,7 @@ | DFLAG_BIG_SEQTRACE_LABELS \ | DFLAG_EXIT_PAYLOAD \ | DFLAG_FRAGMENTS \ + | DFLAG_HANDSHAKE_23 \ | DFLAG_SPAWN) /* Flags addable by local distr implementations */ @@ -208,7 +217,7 @@ extern int erts_dflags_test_remove_hopefull_flags; typedef enum { TTBSize, TTBEncode, TTBCompress } TTBState; typedef struct TTBSizeContext_ { - Uint flags; + Uint64 dflags; int level; Sint vlen; int iovec; @@ -223,7 +232,7 @@ typedef struct TTBSizeContext_ { #define ERTS_INIT_TTBSizeContext(Ctx, Flags) \ do { \ (Ctx)->wstack.wstart = NULL; \ - (Ctx)->flags = (Flags); \ + (Ctx)->dflags = (Flags); \ (Ctx)->level = 0; \ (Ctx)->vlen = -1; \ (Ctx)->fragment_size = ~((Uint) 0); \ @@ -232,8 +241,8 @@ typedef struct TTBSizeContext_ { } while (0) typedef struct TTBEncodeContext_ { - Uint flags; - Uint hopefull_flags; + Uint64 dflags; + Uint64 hopefull_flags; byte *hopefull_flagsp; int level; byte* ep; @@ -261,7 +270,7 @@ typedef struct TTBEncodeContext_ { #define ERTS_INIT_TTBEncodeContext(Ctx, Flags) \ do { \ (Ctx)->wstack.wstart = NULL; \ - (Ctx)->flags = (Flags); \ + (Ctx)->dflags = (Flags); \ (Ctx)->level = 0; \ (Ctx)->vlen = 0; \ (Ctx)->size = 0; \ @@ -331,7 +340,7 @@ typedef struct erts_dsig_send_context { ErtsDistOutputBuf *obuf; Uint alloced_fragments, fragments; Sint vlen; - Uint32 flags; + Uint64 dflags; Process *c_p; union { TTBSizeContext sc; diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 23df37b597..5945bf48d9 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -4284,9 +4284,9 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) } } else if (ERTS_IS_ATOM_STR("term_to_binary_tuple_fallbacks", tp[1])) { - Uint dflags = (TERM_TO_BINARY_DFLAGS - & ~DFLAG_EXPORT_PTR_TAG - & ~DFLAG_BIT_BINARIES); + Uint64 dflags = (TERM_TO_BINARY_DFLAGS + & ~DFLAG_EXPORT_PTR_TAG + & ~DFLAG_BIT_BINARIES); Eterm res = erts_term_to_binary(BIF_P, tp[2], 0, dflags); if (is_value(res)) BIF_RET(res); diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 213983b0fd..0f30f71a0a 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -178,9 +178,8 @@ dist_table_alloc(void *dep_tmpl) dep->state = ERTS_DE_STATE_IDLE; dep->pending_nodedown = 0; dep->suspended_nodeup = NULL; - dep->flags = 0; + dep->dflags = 0; dep->opts = 0; - dep->version = 0; dep->mld = NULL; @@ -635,7 +634,7 @@ erts_set_dist_entry_not_connected(DistEntry *dep) else { ASSERT(dep->state != ERTS_DE_STATE_IDLE); ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid)); - if (dep->flags & DFLAG_PUBLISHED) { + if (dep->dflags & DFLAG_PUBLISHED) { ASSERT(erts_no_of_visible_dist_entries > 0); erts_no_of_visible_dist_entries--; head = &erts_visible_dist_entries; @@ -659,7 +658,7 @@ erts_set_dist_entry_not_connected(DistEntry *dep) dep->next->prev = dep->prev; dep->state = ERTS_DE_STATE_IDLE; - dep->flags = 0; + dep->dflags = 0; dep->opts = 0; dep->prev = NULL; dep->cid = NIL; @@ -701,7 +700,7 @@ erts_set_dist_entry_pending(DistEntry *dep) erts_no_of_not_connected_dist_entries--; dep->state = ERTS_DE_STATE_PENDING; - dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY | DFLAG_PENDING_CONNECT); + dep->dflags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY | DFLAG_PENDING_CONNECT); dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; ASSERT(!dep->mld); @@ -720,7 +719,7 @@ erts_set_dist_entry_pending(DistEntry *dep) } void -erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) +erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint64 flags) { erts_aint32_t set_qflgs; @@ -751,7 +750,7 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) erts_no_of_pending_dist_entries--; dep->state = ERTS_DE_STATE_CONNECTED; - dep->flags = flags & ~DFLAG_PENDING_CONNECT; + dep->dflags = flags & ~DFLAG_PENDING_CONNECT; dep->cid = cid; erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) cid); diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 49708ccc67..f426f46d53 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -145,10 +145,9 @@ struct dist_entry_ { enum dist_entry_state state; int pending_nodedown; Process* suspended_nodeup; - Uint32 flags; /* Distribution flags, like hidden, + Uint64 dflags; /* Distribution flags, like hidden, atom cache etc. */ Uint32 opts; - unsigned long version; /* Protocol version */ ErtsMonLnkDist *mld; /* Monitors and links */ @@ -257,7 +256,7 @@ Uint erts_dist_table_size(void); void erts_dist_table_info(fmtfn_t, void *); void erts_set_dist_entry_not_connected(DistEntry *); void erts_set_dist_entry_pending(DistEntry *); -void erts_set_dist_entry_connected(DistEntry *, Eterm, Uint); +void erts_set_dist_entry_connected(DistEntry *, Eterm, Uint64); ErlNode *erts_find_or_insert_node(Eterm, Uint32, Eterm); void erts_schedule_delete_node(ErlNode *); void erts_set_this_node(Eterm, Uint32); diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index d9587fe11e..5d91c1b2cb 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -101,13 +101,13 @@ static Export term_to_binary_trap_export; -static byte* enc_term(ErtsAtomCacheMap *, Eterm, byte*, Uint32, struct erl_off_heap_header** off_heap); +static byte* enc_term(ErtsAtomCacheMap *, Eterm, byte*, Uint64, struct erl_off_heap_header** off_heap); struct TTBEncodeContext_; -static int enc_term_int(struct TTBEncodeContext_*,ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, Uint32 dflags, +static int enc_term_int(struct TTBEncodeContext_*,ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, Uint64 dflags, struct erl_off_heap_header** off_heap, Sint *reds, byte **res); static int is_external_string(Eterm obj, Uint* lenp); -static byte* enc_atom(ErtsAtomCacheMap *, Eterm, byte*, Uint32); -static byte* enc_pid(ErtsAtomCacheMap *, Eterm, byte*, Uint32); +static byte* enc_atom(ErtsAtomCacheMap *, Eterm, byte*, Uint64); +static byte* enc_pid(ErtsAtomCacheMap *, Eterm, byte*, Uint64); struct B2TContext_t; static byte* dec_term(ErtsDistExternal*, ErtsHeapFactory*, byte*, Eterm*, struct B2TContext_t*, int); static byte* dec_atom(ErtsDistExternal *, byte*, Eterm*); @@ -116,16 +116,16 @@ static Sint decoded_size(byte *ep, byte* endp, int internal_tags, struct B2TCont static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1); static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm opts, int level, - Uint flags, Binary *context_b, int iovec, + Uint64 dflags, Binary *context_b, int iovec, Uint fragment_size); -static Uint encode_size_struct2(ErtsAtomCacheMap *, Eterm, unsigned); +static Uint encode_size_struct2(ErtsAtomCacheMap *, Eterm, Uint64); static ErtsExtSzRes encode_size_struct_int(TTBSizeContext*, ErtsAtomCacheMap *acmp, - Eterm obj, unsigned dflags, Sint *reds, Uint *res); + Eterm obj, Uint64 dflags, Sint *reds, Uint *res); static Export binary_to_term_trap_export; static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1); -static Sint transcode_dist_obuf(ErtsDistOutputBuf*, DistEntry*, Uint32 dflags, Sint reds); +static Sint transcode_dist_obuf(ErtsDistOutputBuf*, DistEntry*, Uint64 dflags, Sint reds); static byte *hopefull_bit_binary(TTBEncodeContext* ctx, byte **epp, Binary *pb_val, Eterm pb_term, byte *bytes, byte bitoffs, byte bitsize, Uint sz); static void hopefull_export(TTBEncodeContext* ctx, byte **epp, Export* exp, Uint32 dflags, @@ -228,7 +228,7 @@ erts_destroy_atom_cache_map(ErtsAtomCacheMap *acmp) } static ERTS_INLINE void -insert_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) +insert_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint64 dflags) { if (acmp && acmp->sz < ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES) { int ix; @@ -244,7 +244,7 @@ insert_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) } static ERTS_INLINE int -get_iix_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) +get_iix_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint64 dflags) { if (!acmp) return -1; @@ -264,7 +264,7 @@ get_iix_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) } void -erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) +erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint64 dflags) { if (acmp) { int long_atoms = 0; /* !0 if one or more atoms are longer than 255. */ @@ -306,12 +306,11 @@ erts_encode_ext_dist_header_size(TTBEncodeContext *ctx, ErtsAtomCacheMap *acmp, Uint fragments) { - - if (ctx->flags & DFLAG_PENDING_CONNECT) { + if (ctx->dflags & DFLAG_PENDING_CONNECT) { /* HOPEFUL_DATA + hopefull flags + hopefull ix + payload ix */ - return 1 + 4 + 4 + 4; + return 1 + 8 + 4 + 4; } - else if (!acmp && !(ctx->flags & DFLAG_FRAGMENTS)) + else if (!acmp && !(ctx->dflags & DFLAG_FRAGMENTS)) return 1; /* pass through */ else { int fix_sz @@ -329,7 +328,7 @@ erts_encode_ext_dist_header_size(TTBEncodeContext *ctx, ASSERT(acmp->hdr_sz >= 0); fix_sz += acmp->hdr_sz; } else { - ASSERT(ctx->flags & DFLAG_FRAGMENTS); + ASSERT(ctx->dflags & DFLAG_FRAGMENTS); } return fix_sz; @@ -342,7 +341,7 @@ byte *erts_encode_ext_dist_header_setup(TTBEncodeContext *ctx, { /* Maximum number of atom must be less than the maximum of a 32 bits unsigned integer. Check is done in erl_init.c, erl_start function. */ - if (ctx->flags & DFLAG_PENDING_CONNECT) { + if (ctx->dflags & DFLAG_PENDING_CONNECT) { byte *ep = ctl_ext; ep -= 4; ctx->payload_ixp = ep; @@ -350,13 +349,13 @@ byte *erts_encode_ext_dist_header_setup(TTBEncodeContext *ctx, ep -= 4; ctx->hopefull_ixp = ep; put_int32(ERTS_NO_HIX, ep); - ep -= 4; + ep -= 8; ctx->hopefull_flagsp = ep; - put_int32(0, ep); + put_int64(0, ep); *--ep = HOPEFUL_DATA; return ep; } - else if (!acmp && !(ctx->flags & DFLAG_FRAGMENTS)) { + else if (!acmp && !(ctx->dflags & DFLAG_FRAGMENTS)) { byte *ep = ctl_ext; *--ep = PASS_THROUGH; return ep; @@ -390,7 +389,7 @@ byte *erts_encode_ext_dist_header_setup(TTBEncodeContext *ctx, --ep; put_int8(acmp->sz, ep); } else { - ASSERT(ctx->flags & DFLAG_FRAGMENTS); + ASSERT(ctx->dflags & DFLAG_FRAGMENTS); /* If we don't have an atom cache but are using a dist header we just put 0 in the atom cache size slot */ --ep; @@ -432,7 +431,7 @@ byte *erts_encode_ext_dist_header_fragment(byte **hdrpp, Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, DistEntry* dep, - Uint32 dflags, + Uint64 dflags, Sint reds) { byte *ip; @@ -643,18 +642,18 @@ erts_encode_dist_ext_size(Eterm term, if (ctx->vlen < 0) { /* First term as well */ ctx->vlen = 0; - if (ctx->flags & DFLAG_FRAGMENTS) + if (ctx->dflags & DFLAG_FRAGMENTS) ctx->fragment_size = ERTS_DIST_FRAGMENT_SIZE; } #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE|DFLAG_FRAGMENTS))) + if (!(ctx->dflags & (DFLAG_DIST_HDR_ATOM_CACHE|DFLAG_FRAGMENTS))) #endif sz++ /* VERSION_MAGIC */; } - res = encode_size_struct_int(ctx, acmp, term, ctx->flags, redsp, &sz); + res = encode_size_struct_int(ctx, acmp, term, ctx->dflags, redsp, &sz); if (res == ERTS_EXT_SZ_OK) { Uint total_size, fragments; @@ -692,11 +691,12 @@ ErtsExtSzRes erts_encode_ext_size(Eterm term, Uint *szp) Uint erts_encode_ext_size_ets(Eterm term) { - return encode_size_struct2(NULL, term, TERM_TO_BINARY_DFLAGS|DFLAG_INTERNAL_TAGS); + return encode_size_struct2(NULL, term, + TERM_TO_BINARY_DFLAGS|DFLAG_ETS_COMPRESSED); } -int erts_encode_dist_ext(Eterm term, byte **ext, Uint32 flags, ErtsAtomCacheMap *acmp, +int erts_encode_dist_ext(Eterm term, byte **ext, Uint64 flags, ErtsAtomCacheMap *acmp, TTBEncodeContext* ctx, Uint *fragmentsp, Sint* reds) { int res; @@ -726,7 +726,7 @@ int erts_encode_dist_ext(Eterm term, byte **ext, Uint32 flags, ErtsAtomCacheMap *fragmentsp = res == 0 ? ctx->frag_ix + 1 : ctx->frag_ix; if (flags & DFLAG_PENDING_CONNECT) { ASSERT(ctx->hopefull_flagsp); - put_int32(ctx->hopefull_flags, ctx->hopefull_flagsp); + put_int64(ctx->hopefull_flags, ctx->hopefull_flagsp); } return res; } @@ -745,7 +745,7 @@ void erts_encode_ext(Eterm term, byte **ext) byte* erts_encode_ext_ets(Eterm term, byte *ep, struct erl_off_heap_header** off_heap) { - return enc_term(NULL, term, ep, TERM_TO_BINARY_DFLAGS|DFLAG_INTERNAL_TAGS, + return enc_term(NULL, term, ep, TERM_TO_BINARY_DFLAGS|DFLAG_ETS_COMPRESSED, off_heap); } @@ -850,7 +850,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, ASSERT(dep); erts_de_rlock(dep); - ASSERT(dep->flags & DFLAG_UTF8_ATOMS); + ASSERT(dep->dflags & DFLAG_UTF8_ATOMS); if ((dep->state != ERTS_DE_STATE_CONNECTED && @@ -860,7 +860,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, return ERTS_PREP_DIST_EXT_CLOSED; } - if (!(dep->flags & (DFLAG_DIST_HDR_ATOM_CACHE|DFLAG_FRAGMENTS))) { + if (!(dep->dflags & (DFLAG_DIST_HDR_ATOM_CACHE|DFLAG_FRAGMENTS))) { /* Skip PASS_THROUGH */ ext++; size--; @@ -890,7 +890,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, edep->data->seq_id = 0; edep->data->frag_id = 1; - if (dep->flags & (DFLAG_DIST_HDR_ATOM_CACHE|DFLAG_FRAGMENTS)) + if (dep->dflags & (DFLAG_DIST_HDR_ATOM_CACHE|DFLAG_FRAGMENTS)) edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; if (ep[1] != DIST_HEADER && ep[1] != DIST_FRAG_HEADER && ep[1] != DIST_FRAG_CONT) { @@ -900,7 +900,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, edep->data->extp = ext; } else if (ep[1] == DIST_FRAG_CONT) { - if (!(dep->flags & DFLAG_FRAGMENTS)) + if (!(dep->dflags & DFLAG_FRAGMENTS)) goto bad_hdr; edep->attab.size = 0; edep->data->extp = ext + 1 + 1 + 8 + 8; @@ -917,7 +917,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, goto bad_hdr; if (ep[1] == DIST_FRAG_HEADER) { - if (!(dep->flags & DFLAG_FRAGMENTS)) + if (!(dep->dflags & DFLAG_FRAGMENTS)) goto bad_hdr; edep->data->seq_id = get_int64(&ep[2]); edep->data->frag_id = get_int64(&ep[2+8]); @@ -2189,7 +2189,7 @@ external_size_2(BIF_ALIST_2) } static Eterm -erts_term_to_binary_simple(Process* p, Eterm Term, Uint size, int level, Uint flags) +erts_term_to_binary_simple(Process* p, Eterm Term, Uint size, int level, Uint64 dflags) { Eterm bin; size_t real_size; @@ -2205,7 +2205,7 @@ erts_term_to_binary_simple(Process* p, Eterm Term, Uint size, int level, Uint fl bytes = erts_alloc(ERTS_ALC_T_TMP, size); } - if ((endp = enc_term(NULL, Term, bytes, flags, NULL)) + if ((endp = enc_term(NULL, Term, bytes, dflags, NULL)) == NULL) { erts_exit(ERTS_ERROR_EXIT, "%s, line %d: bad term: %x\n", __FILE__, __LINE__, Term); @@ -2250,7 +2250,7 @@ erts_term_to_binary_simple(Process* p, Eterm Term, Uint size, int level, Uint fl bin = new_binary(p, (byte *)NULL, size); bytes = binary_bytes(bin); bytes[0] = VERSION_MAGIC; - if ((endp = enc_term(NULL, Term, bytes+1, flags, NULL)) + if ((endp = enc_term(NULL, Term, bytes+1, dflags, NULL)) == NULL) { erts_exit(ERTS_ERROR_EXIT, "%s, line %d: bad term: %x\n", __FILE__, __LINE__, Term); @@ -2265,7 +2265,7 @@ erts_term_to_binary_simple(Process* p, Eterm Term, Uint size, int level, Uint fl } Eterm -erts_term_to_binary(Process* p, Eterm Term, int level, Uint flags) { +erts_term_to_binary(Process* p, Eterm Term, int level, Uint64 flags) { Uint size = 0; switch (encode_size_struct_int(NULL, NULL, Term, flags, NULL, &size)) { case ERTS_EXT_SZ_SYSTEM_LIMIT: @@ -2396,7 +2396,7 @@ erts_ttb_iov_init(TTBEncodeContext *ctx, int use_termv, char *ptr, } static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm opts, - int level, Uint flags, Binary *context_b, + int level, Uint64 dflags, Binary *context_b, int iovec, Uint fragment_size) { Eterm *hp; @@ -2438,7 +2438,7 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm /* Setup enough to get started */ context->state = TTBSize; context->alive = 1; - ERTS_INIT_TTBSizeContext(&context->s.sc, flags); + ERTS_INIT_TTBSizeContext(&context->s.sc, dflags); context->s.sc.level = level; context->s.sc.fragment_size = fragment_size; if (!level) { @@ -2458,7 +2458,7 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm switch (context->state) { case TTBSize: { - Uint size, flags, fragments = 1; + Uint size, fragments = 1; Binary *result_bin; int level = context->s.sc.level; Sint vlen; @@ -2466,7 +2466,7 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm fragment_size = context->s.sc.fragment_size; size = 1; /* VERSION_MAGIC */ switch (encode_size_struct_int(&context->s.sc, NULL, Term, - context->s.sc.flags, &reds, + context->s.sc.dflags, &reds, &size)) { case ERTS_EXT_SZ_SYSTEM_LIMIT: BUMP_REDS(p, (initial_reds - reds) / TERM_TO_BINARY_LOOP_FACTOR); @@ -2479,7 +2479,7 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm break; } /* Move these to next state */ - flags = context->s.sc.flags; + dflags = context->s.sc.dflags; vlen = context->s.sc.vlen; if (vlen >= 0) { Uint total_size = size + context->s.sc.extra_size; @@ -2490,7 +2490,7 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm else if (size <= ERL_ONHEAP_BIN_LIMIT) { /* Finish in one go */ res = erts_term_to_binary_simple(p, Term, size, - level, flags); + level, dflags); if (iovec) { Eterm *hp = HAlloc(p, 2); res = CONS(hp, res, NIL); @@ -2503,7 +2503,7 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm result_bin->orig_bytes[0] = (byte)VERSION_MAGIC; /* Next state immediately, no need to export context */ context->state = TTBEncode; - ERTS_INIT_TTBEncodeContext(&context->s.ec, flags); + ERTS_INIT_TTBEncodeContext(&context->s.ec, dflags); context->s.ec.level = level; context->s.ec.result_bin = result_bin; context->s.ec.iovec = iovec; @@ -2525,8 +2525,8 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm Sint realloc_offset; Uint fragments; - flags = context->s.ec.flags; - if (enc_term_int(&context->s.ec, NULL,Term, bytes+1, flags, + dflags = context->s.ec.dflags; + if (enc_term_int(&context->s.ec, NULL,Term, bytes+1, dflags, NULL, &reds, &endp) < 0) { EXPORT_CONTEXT(); RETURN_STATE(); @@ -2769,7 +2769,7 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm */ static byte* -enc_atom(ErtsAtomCacheMap *acmp, Eterm atom, byte *ep, Uint32 dflags) +enc_atom(ErtsAtomCacheMap *acmp, Eterm atom, byte *ep, Uint64 dflags) { int iix; int len; @@ -2777,7 +2777,7 @@ enc_atom(ErtsAtomCacheMap *acmp, Eterm atom, byte *ep, Uint32 dflags) ASSERT(is_atom(atom)); - if (dflags & DFLAG_INTERNAL_TAGS) { + if (dflags & DFLAG_ETS_COMPRESSED) { Uint aval = atom_val(atom); ASSERT(aval < (1<<24)); if (aval >= (1 << 16)) { @@ -2854,16 +2854,16 @@ enc_atom(ErtsAtomCacheMap *acmp, Eterm atom, byte *ep, Uint32 dflags) /* * We use this atom as sysname in local pid/port/refs - * for the ETS compressed format (DFLAG_INTERNAL_TAGS). + * for the ETS compressed format * */ #define INTERNAL_LOCAL_SYSNAME am_ErtsSecretAtom static byte* -enc_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint32 dflags) +enc_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint64 dflags) { Uint on, os; - Eterm sysname = ((is_internal_pid(pid) && (dflags & DFLAG_INTERNAL_TAGS)) + Eterm sysname = ((is_internal_pid(pid) && (dflags & DFLAG_ETS_COMPRESSED)) ? INTERNAL_LOCAL_SYSNAME : pid_node_name(pid)); Uint32 creation = pid_creation(pid); @@ -3044,7 +3044,7 @@ dec_pid(ErtsDistExternal *edep, ErtsHeapFactory* factory, byte* ep, #define ENC_LAST_ARRAY_ELEMENT ((Eterm) 6) static byte* -enc_term(ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, Uint32 dflags, +enc_term(ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, Uint64 dflags, struct erl_off_heap_header** off_heap) { byte *res; @@ -3053,7 +3053,8 @@ enc_term(ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, Uint32 dflags, } static int -enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, Uint32 dflags, +enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, + Uint64 dflags, struct erl_off_heap_header** off_heap, Sint *reds, byte **res) { DECLARE_WSTACK(s); @@ -3260,7 +3261,7 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, case REF_DEF: case EXTERNAL_REF_DEF: { Uint32 *ref_num; - Eterm sysname = (((dflags & DFLAG_INTERNAL_TAGS) && is_internal_ref(obj)) + Eterm sysname = (((dflags & DFLAG_ETS_COMPRESSED) && is_internal_ref(obj)) ? INTERNAL_LOCAL_SYSNAME : ref_node_name(obj)); Uint32 creation = ref_creation(obj); @@ -3284,7 +3285,7 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, } case PORT_DEF: case EXTERNAL_PORT_DEF: { - Eterm sysname = (((dflags & DFLAG_INTERNAL_TAGS) && is_internal_port(obj)) + Eterm sysname = (((dflags & DFLAG_ETS_COMPRESSED) && is_internal_port(obj)) ? INTERNAL_LOCAL_SYSNAME : port_node_name(obj)); Uint32 creation = port_creation(obj); @@ -3458,7 +3459,7 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, } } } - else if (dflags & DFLAG_INTERNAL_TAGS) { + else if (dflags & DFLAG_ETS_COMPRESSED) { ProcBin* pb = (ProcBin*) binary_val(obj); Uint bytesize = pb->size; if (pb->thing_word == HEADER_SUB_BIN) { @@ -4963,7 +4964,7 @@ error_hamt: (except for cached atoms) */ static Uint encode_size_struct2(ErtsAtomCacheMap *acmp, Eterm obj, - unsigned dflags) { + Uint64 dflags) { Uint size = 0; ErtsExtSzRes res = encode_size_struct_int(NULL, acmp, obj, dflags, NULL, @@ -4978,7 +4979,7 @@ static Uint encode_size_struct2(ErtsAtomCacheMap *acmp, static ErtsExtSzRes encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, - unsigned dflags, Sint *reds, Uint *res) + Uint64 dflags, Sint *reds, Uint *res) { DECLARE_WSTACK(s); Uint m, i, arity; @@ -5022,7 +5023,7 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, result++; break; case ATOM_DEF: - if (dflags & DFLAG_INTERNAL_TAGS) { + if (dflags & DFLAG_ETS_COMPRESSED) { if (atom_val(obj) >= (1<<16)) { result += 1 + 3; } @@ -5187,7 +5188,7 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, Uint bin_size = pb->size; byte bitoffs = 0; byte bitsize = 0; - if (dflags & DFLAG_INTERNAL_TAGS) { + if (dflags & DFLAG_ETS_COMPRESSED) { ProcBin* pb = (ProcBin*) binary_val(obj); Uint sub_extra = 0; if (pb->thing_word == HEADER_SUB_BIN) { @@ -5749,13 +5750,14 @@ error: Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, DistEntry* dep, - Uint32 dflags, + Uint64 dflags, Sint reds) { ErlIOVec* eiov = ob->eiov; SysIOVec* iov = eiov->iov; byte *hdr; - Uint32 hopefull_flags, hopefull_ix, payload_ix; + Uint64 hopefull_flags; + Uint32 hopefull_ix, payload_ix; Sint start_r, r; Uint new_len; byte *ep; @@ -5770,7 +5772,7 @@ Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, * +---+--------------+-----------+----------+ * |'H'|Hopefull Flags|Hopefull IX|Payload IX| * +---+--------------+-----------+----------+ - * 1 4 4 4 + * 1 8 4 4 * * Hopefull flags: Flags corresponding to actual * hopefull encodings in this @@ -5788,7 +5790,7 @@ Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, hdr = (byte *) iov[1].iov_base; ASSERT(HOPEFUL_DATA == *((byte *)iov[1].iov_base)); - ASSERT(iov[1].iov_len == 13); + ASSERT(iov[1].iov_len == 1+8+4+4); /* Control message always begin in vector element 2 */ ep = iov[2].iov_base; @@ -5812,9 +5814,9 @@ Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, } hdr++; - hopefull_flags = get_int32(hdr); + hopefull_flags = get_int64(hdr); - hdr += 4; + hdr += 8; hopefull_ix = get_int32(hdr); if ((~dflags & DFLAG_SPAWN) diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index f6330473d6..bc006f83e2 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -162,13 +162,13 @@ struct TTBEncodeContext_; void erts_init_atom_cache_map(ErtsAtomCacheMap *); void erts_reset_atom_cache_map(ErtsAtomCacheMap *); void erts_destroy_atom_cache_map(ErtsAtomCacheMap *); -void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32); +void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint64); Uint erts_encode_ext_dist_header_size(struct TTBEncodeContext_ *ctx, ErtsAtomCacheMap *, Uint); byte *erts_encode_ext_dist_header_setup(struct TTBEncodeContext_ *ctx, byte *, ErtsAtomCacheMap *, Uint, Eterm); byte *erts_encode_ext_dist_header_fragment(byte **, Uint, Eterm); -Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, DistEntry *, Uint32 dflags, Sint reds); +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, DistEntry *, Uint64 dflags, Sint reds); struct erts_dsig_send_context; typedef enum { @@ -181,7 +181,7 @@ ErtsExtSzRes erts_encode_dist_ext_size(Eterm term, ErtsAtomCacheMap *acmp, struct TTBSizeContext_ *ctx, Uint* szp, Sint *redsp, Sint *vlenp, Uint *fragments); -int erts_encode_dist_ext(Eterm, byte **, Uint32, ErtsAtomCacheMap *, +int erts_encode_dist_ext(Eterm, byte **, Uint64, ErtsAtomCacheMap *, struct TTBEncodeContext_ *, Uint *, Sint *); ErtsExtSzRes erts_encode_ext_size(Eterm, Uint *szp); @@ -214,7 +214,7 @@ Sint erts_decode_ext_size_ets(byte*, Uint); Eterm erts_decode_ext(ErtsHeapFactory*, byte**, Uint32 flags); Eterm erts_decode_ext_ets(ErtsHeapFactory*, byte*); -Eterm erts_term_to_binary(Process* p, Eterm Term, int level, Uint flags); +Eterm erts_term_to_binary(Process* p, Eterm Term, int level, Uint64 flags); Eterm erts_debug_term_to_binary(Process *p, Eterm term, Eterm opts); Sint erts_binary2term_prepare(ErtsBinary2TermState *, byte *, Sint); diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex e79d9cafad..242890c3cd 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam Binary files differindex 631afa0809..c172bbccfb 100644 --- a/erts/preloaded/ebin/erts_internal.beam +++ b/erts/preloaded/ebin/erts_internal.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index ab0cbe883e..bcbe4a5f3d 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -1757,11 +1757,9 @@ setnode(_P1, _P2) -> -spec erlang:setnode(Node, DistCtrlr, Opts) -> dist_handle() when Node :: atom(), DistCtrlr :: port() | pid(), - Opts :: {integer(), integer(), atom(), atom()}. -setnode(Node, DistCtrlr, {Flags, Ver, IC, OC} = Opts) when erlang:is_atom(IC), - erlang:is_atom(OC) -> - case case erts_internal:create_dist_channel(Node, DistCtrlr, - Flags, Ver) of + Opts :: {integer(), integer(), pos_integer()}. +setnode(Node, DistCtrlr, {_Flags, _Ver, _Creation} = Opts) -> + case case erts_internal:create_dist_channel(Node, DistCtrlr, Opts) of {ok, DH} -> DH; {message, Ref} -> receive {Ref, Res} -> Res end; Err -> Err diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 0fa39ebc01..6c23b47895 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -67,6 +67,7 @@ -export([dist_ctrl_put_data/2]). -export([get_dflags/0]). +-export([get_creation/0]). -export([new_connection/1]). -export([abort_pending_connection/2]). @@ -89,7 +90,7 @@ -export([process_flag/3]). --export([create_dist_channel/4]). +-export([create_dist_channel/3]). -export([erase_persistent_terms/0]). @@ -568,6 +569,10 @@ dist_ctrl_put_data(DHandle, IoList) -> get_dflags() -> erlang:nif_error(undefined). +-spec erts_internal:get_creation() -> pos_integer(). +get_creation() -> + erlang:nif_error(undefined). + -spec erts_internal:new_connection(Node) -> ConnId when Node :: atom(), ConnId :: {integer(), erlang:dist_handle()}. @@ -707,17 +712,18 @@ process_display(_Pid, _Type) -> process_flag(_Pid, _Flag, _Value) -> erlang:nif_error(undefined). --spec create_dist_channel(Node, DistCtrlr, Flags, Ver) -> Result when +-spec create_dist_channel(Node, DistCtrlr, {Flags, Ver, Cr}) -> Result when Node :: atom(), DistCtrlr :: port() | pid(), Flags :: integer(), Ver :: integer(), + Cr :: pos_integer(), Result :: {'ok', erlang:dist_handle()} | {'message', reference()} | 'badarg' | 'system_limit'. -create_dist_channel(_Node, _DistCtrlr, _Flags, _Ver) -> +create_dist_channel(_Node, _DistCtrlr, _Tpl) -> erlang:nif_error(undefined). -spec erase_persistent_terms() -> 'ok'. diff --git a/lib/erl_docgen/priv/dtd/common.dtd b/lib/erl_docgen/priv/dtd/common.dtd index 0feb09eac2..90a8d7cbdb 100644 --- a/lib/erl_docgen/priv/dtd/common.dtd +++ b/lib/erl_docgen/priv/dtd/common.dtd @@ -69,7 +69,7 @@ <!ATTLIST list type (ordered|bulleted) "bulleted" > <!ELEMENT taglist (marker*,tag,item+)+ > <!ELEMENT tag (#PCDATA|c|i|em|br|seealso|url|marker|anno)* > -<!ELEMENT item (%inline;|%block;|warning|note|dont|do|quote)* > +<!ELEMENT item (%inline;|%block;|warning|note|dont|do|quote|table)* > <!-- References --> diff --git a/lib/erl_interface/include/ei.h b/lib/erl_interface/include/ei.h index 7d39043bb2..605a0d3327 100644 --- a/lib/erl_interface/include/ei.h +++ b/lib/erl_interface/include/ei.h @@ -362,7 +362,7 @@ typedef struct ei_cnode_s { /* Currently this_ipaddr isn't used */ /* struct in_addr this_ipaddr; */ char ei_connect_cookie[EI_MAX_COOKIE_SIZE+1]; - short creation; + unsigned int creation; erlang_pid self; ei_socket_callbacks *cbs; void *setup_context; diff --git a/lib/erl_interface/src/connect/ei_connect.c b/lib/erl_interface/src/connect/ei_connect.c index f84c89f304..9a115fa6ae 100644 --- a/lib/erl_interface/src/connect/ei_connect.c +++ b/lib/erl_interface/src/connect/ei_connect.c @@ -106,7 +106,8 @@ int ei_tracelevel = 0; (offsetof(ei_socket_callbacks, get_fd) \ + sizeof(int (*)(void *))) -/* FIXME why not macro? */ +typedef EI_ULONGLONG DistFlags; + static char *null_cookie = ""; static int get_cookie(char *buf, int len); @@ -120,15 +121,17 @@ static int send_status(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, char *status, unsigned ms); static int recv_status(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, unsigned ms); -static int send_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, - char *nodename, unsigned challenge, - unsigned version, unsigned ms); +static int send_challenge(ei_cnode *ec, void *ctx, int pkt_sz, + unsigned challenge, + DistFlags version, unsigned ms); static int recv_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, unsigned *challenge, unsigned *version, - unsigned *flags, char *namebuf, unsigned ms); + DistFlags *flags, char *namebuf, unsigned ms); static int send_challenge_reply(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, unsigned char digest[16], unsigned challenge, unsigned ms); +static int recv_complement(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned ms); static int recv_challenge_reply(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, unsigned our_challenge, char cookie[], @@ -139,12 +142,14 @@ static int send_challenge_ack(ei_socket_callbacks *cbs, void *ctx, static int recv_challenge_ack(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, unsigned our_challenge, char cookie[], unsigned ms); -static int send_name(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, - char *nodename, unsigned version, unsigned ms); - +static int send_name(ei_cnode *ec, void *ctx, int pkt_sz, + unsigned version, unsigned ms); +static int send_complement(ei_cnode *ec, void *ctx, int pkt_sz, + unsigned epmd_says_version, DistFlags her_flags, + unsigned ms); static int recv_name(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, - unsigned *version, unsigned *flags, char *namebuf, - unsigned ms); + char* send_name_tag, DistFlags *flags, + char *namebuf, unsigned ms); static struct hostent* dyn_gethostbyname_r(const char *name, struct hostent *hostp, char **buffer_p, @@ -989,8 +994,9 @@ int ei_xconnect_tmo(ei_cnode* ec, Erl_IpAddr ip_addr, char *alivename, unsigned void *ctx; int rport = 0; /*uint16 rport = 0;*/ int sockd; - int dist = 0; - unsigned her_flags, her_version; + unsigned epmd_says_version = 0; + unsigned her_version; + DistFlags her_flags; unsigned our_challenge, her_challenge; unsigned char our_digest[16]; int err; @@ -1003,14 +1009,16 @@ int ei_xconnect_tmo(ei_cnode* ec, Erl_IpAddr ip_addr, char *alivename, unsigned EI_TRACE_CONN1("ei_xconnect","-> CONNECT attempt to connect to %s", alivename); - if ((rport = ei_epmd_port_tmo(ip_addr,alivename,&dist, tmo)) < 0) { + if ((rport = ei_epmd_port_tmo(ip_addr,alivename,(int*)&epmd_says_version, + tmo)) < 0) { EI_TRACE_ERR0("ei_xconnect","-> CONNECT can't get remote port"); /* ei_epmd_port_tmo() has set erl_errno */ return ERL_NO_PORT; } - if (dist <= 4) { - EI_TRACE_ERR0("ei_xconnect","-> CONNECT remote version not compatible"); + if (epmd_says_version < EI_DIST_LOW) { + EI_TRACE_ERR1("ei_xconnect","-> CONNECT remote version %d not compatible", + epmd_says_version); return ERL_ERROR; } @@ -1050,21 +1058,24 @@ int ei_xconnect_tmo(ei_cnode* ec, Erl_IpAddr ip_addr, char *alivename, unsigned goto error; } - if (send_name(cbs, ctx, pkt_sz, ec->thisnodename, (unsigned) dist, tmo)) + if (send_name(ec, ctx, pkt_sz, epmd_says_version, tmo)) goto error; if (recv_status(cbs, ctx, pkt_sz, tmo)) goto error; - if (recv_challenge(cbs, ctx, pkt_sz, &her_challenge, - &her_version, &her_flags, NULL, tmo)) + if (recv_challenge(cbs, ctx, pkt_sz, &her_challenge, &her_version, + &her_flags, NULL, tmo)) goto error; + her_version = (her_flags & DFLAG_HANDSHAKE_23) ? EI_DIST_6 : EI_DIST_5; our_challenge = gen_challenge(); gen_digest(her_challenge, ec->ei_connect_cookie, our_digest); + if (send_complement(ec, ctx, pkt_sz, epmd_says_version, her_flags, tmo)) + goto error; if (send_challenge_reply(cbs, ctx, pkt_sz, our_digest, our_challenge, tmo)) goto error; if (recv_challenge_ack(cbs, ctx, pkt_sz, our_challenge, ec->ei_connect_cookie, tmo)) goto error; - if (put_ei_socket_info(sockd, dist, null_cookie, ec, cbs, ctx) != 0) + if (put_ei_socket_info(sockd, her_version, null_cookie, ec, cbs, ctx) != 0) goto error; if (cbs->connect_handshake_complete) { @@ -1209,8 +1220,9 @@ int ei_accept(ei_cnode* ec, int lfd, ErlConnect *conp) int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms) { int fd; - unsigned her_version, her_flags; + DistFlags her_flags; char tmp_nodename[MAXNODELEN+1]; + char send_name_tag; char *her_name; int pkt_sz, err; struct sockaddr_in addr; @@ -1235,6 +1247,10 @@ int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms) ctx = EI_FD_AS_CTX__(lfd); } + if (ec->cbs != cbs) { + EI_CONN_SAVE_ERRNO__(EINVAL); + return ERL_ERROR; + } EI_TRACE_CONN0("ei_accept","<- ACCEPT waiting for connection"); @@ -1281,16 +1297,14 @@ int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms) EI_TRACE_CONN0("ei_accept","<- ACCEPT connected to remote"); - if (recv_name(cbs, ctx, pkt_sz, &her_version, &her_flags, her_name, tmo)) { + if (recv_name(cbs, ctx, pkt_sz, &send_name_tag, &her_flags, + her_name, tmo)) { EI_TRACE_ERR0("ei_accept","<- ACCEPT initial ident failed"); goto error; } - if (her_version <= 4) { - EI_TRACE_ERR0("ei_accept","<- ACCEPT remote version not compatible"); - goto error; - } - else { + { + unsigned her_version = (her_flags & DFLAG_HANDSHAKE_23) ? 6 : 5; unsigned our_challenge; unsigned her_challenge; unsigned char our_digest[16]; @@ -1298,9 +1312,12 @@ int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms) if (send_status(cbs, ctx, pkt_sz, "ok", tmo)) goto error; our_challenge = gen_challenge(); - if (send_challenge(cbs, ctx, pkt_sz, ec->thisnodename, - our_challenge, her_version, tmo)) + if (send_challenge(ec, ctx, pkt_sz, our_challenge, her_flags, tmo)) goto error; + if (send_name_tag == 'n' && (her_flags & DFLAG_HANDSHAKE_23)) { + if (recv_complement(cbs, ctx, pkt_sz, tmo)) + goto error; + } if (recv_challenge_reply(cbs, ctx, pkt_sz, our_challenge, ec->ei_connect_cookie, &her_challenge, tmo)) goto error; @@ -1846,26 +1863,50 @@ error: return -1; } -static int send_name_or_challenge(ei_socket_callbacks *cbs, - void *ctx, - int pkt_sz, - char *nodename, - int f_chall, - unsigned challenge, - unsigned version, - unsigned ms) +static DistFlags preferred_flags(void) +{ + DistFlags flags = + DFLAG_EXTENDED_REFERENCES + | DFLAG_DIST_MONITOR + | DFLAG_EXTENDED_PIDS_PORTS + | DFLAG_FUN_TAGS + | DFLAG_NEW_FUN_TAGS + | DFLAG_NEW_FLOATS + | DFLAG_SMALL_ATOM_TAGS + | DFLAG_UTF8_ATOMS + | DFLAG_MAP_TAG + | DFLAG_BIG_CREATION + | DFLAG_EXPORT_PTR_TAG + | DFLAG_BIT_BINARIES + | DFLAG_HANDSHAKE_23; + if (ei_internal_use_21_bitstr_expfun()) { + flags &= ~(DFLAG_EXPORT_PTR_TAG + | DFLAG_BIT_BINARIES); + } + return flags; +} + +static int send_name(ei_cnode *ec, + void *ctx, + int pkt_sz, + unsigned version, + unsigned ms) { char *buf; unsigned char *s; char dbuf[DEFBUF_SIZ]; - int siz = pkt_sz + 1 + 2 + 4 + strlen(nodename); - const char* function[] = {"SEND_NAME", "SEND_CHALLENGE"}; + const unsigned int nodename_len = strlen(ec->thisnodename); + int siz; int err; ssize_t len; - unsigned int flags; + DistFlags flags; + const char tag = (version == EI_DIST_5) ? 'n' : 'N'; + + if (tag == 'n') + siz = pkt_sz + 1 + 2 + 4 + nodename_len; + else + siz = pkt_sz + 1 + 8 + 4 + 2 + nodename_len; - if (f_chall) - siz += 4; buf = (siz > DEFBUF_SIZ) ? malloc(siz) : dbuf; if (!buf) { erl_errno = ENOMEM; @@ -1882,35 +1923,95 @@ static int send_name_or_challenge(ei_socket_callbacks *cbs, default: return -1; } - put8(s, 'n'); - put16be(s, version); - flags = (DFLAG_EXTENDED_REFERENCES - | DFLAG_DIST_MONITOR - | DFLAG_EXTENDED_PIDS_PORTS - | DFLAG_FUN_TAGS - | DFLAG_NEW_FUN_TAGS - | DFLAG_NEW_FLOATS - | DFLAG_SMALL_ATOM_TAGS - | DFLAG_UTF8_ATOMS - | DFLAG_MAP_TAG - | DFLAG_BIG_CREATION - | DFLAG_EXPORT_PTR_TAG - | DFLAG_BIT_BINARIES); - if (ei_internal_use_21_bitstr_expfun()) { - flags &= ~(DFLAG_EXPORT_PTR_TAG - | DFLAG_BIT_BINARIES); + flags = preferred_flags(); + + put8(s, tag); + if (tag == 'n') { + put16be(s, EI_DIST_5); /* some impl (jinterface) demand ver==5 */ + put32be(s, flags); + } + else { /* tag == 'N' */ + put64be(s, flags); + put32be(s, ec->creation); + put16be(s, nodename_len); } - put32be(s, flags); - if (f_chall) - put32be(s, challenge); - memcpy(s, nodename, strlen(nodename)); + memcpy(s, ec->thisnodename, nodename_len); len = (ssize_t) siz; - err = ei_write_fill_ctx_t__(cbs, ctx, buf, &len, ms); + err = ei_write_fill_ctx_t__(ec->cbs, ctx, buf, &len, ms); if (!err && len != (ssize_t) siz) err = EIO; if (err) { - EI_TRACE_ERR1("send_name_or_challenge", - "-> %s socket write failed", function[f_chall]); + EI_TRACE_ERR0("send_name", "SEND_NAME -> socket write failed"); + if (buf != dbuf) + free(buf); + EI_CONN_SAVE_ERRNO__(err); + return -1; + } + + if (buf != dbuf) + free(buf); + return 0; +} + +static int send_challenge(ei_cnode *ec, + void *ctx, + int pkt_sz, + unsigned challenge, + DistFlags her_flags, + unsigned ms) +{ + char *buf; + unsigned char *s; + char dbuf[DEFBUF_SIZ]; + const unsigned int nodename_len = strlen(ec->thisnodename); + int siz; + int err; + ssize_t len; + DistFlags flags; + const char tag = (her_flags & DFLAG_HANDSHAKE_23) ? 'N' : 'n'; + + if (tag == 'n') + siz = pkt_sz + 1 + 2 + 4 + 4 + nodename_len; + else + siz = pkt_sz + 1 + 8 + 4 + 4 + 2 + nodename_len; + + buf = (siz > DEFBUF_SIZ) ? malloc(siz) : dbuf; + if (!buf) { + erl_errno = ENOMEM; + return -1; + } + s = (unsigned char *)buf; + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } + + flags = preferred_flags(); + put8(s, tag); + if (tag == 'n') { + put16be(s, EI_DIST_5); /* choosen version */ + put32be(s, flags); + put32be(s, challenge); + } + else { + put64be(s, flags); + put32be(s, challenge); + put32be(s, ec->creation); + put16be(s, nodename_len); + } + memcpy(s, ec->thisnodename, nodename_len); + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(ec->cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { + EI_TRACE_ERR0("send_challenge", "-> SEND_CHALLENGE socket write failed"); if (buf != dbuf) free(buf); EI_CONN_SAVE_ERRNO__(err); @@ -1924,13 +2025,13 @@ static int send_name_or_challenge(ei_socket_callbacks *cbs, static int recv_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, unsigned *challenge, unsigned *version, - unsigned *flags, char *namebuf, unsigned ms) + DistFlags *flags, char *namebuf, unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; int is_static = 1; int buflen = DEFBUF_SIZ; - int rlen; + int rlen, nodename_len; char *s; char tag; char tmp_nodename[MAXNODELEN+1]; @@ -1943,21 +2044,57 @@ static int recv_challenge(ei_socket_callbacks *cbs, void *ctx, "<- RECV_CHALLENGE socket read failed (%d)",rlen); goto error; } - if ((rlen - 11) > MAXNODELEN) { - EI_TRACE_ERR1("recv_challenge", - "<- RECV_CHALLENGE nodename too long (%d)",rlen - 11); - goto error; - } s = buf; - if ((tag = get8(s)) != 'n') { + tag = get8(s); + if (tag != 'n' && tag != 'N') { EI_TRACE_ERR2("recv_challenge", "<- RECV_CHALLENGE incorrect tag, " - "expected 'n' got '%c' (%u)",tag,tag); + "expected 'n' or 'N', got '%c' (%u)",tag,tag); goto error; } - *version = get16be(s); - *flags = get32be(s); - *challenge = get32be(s); + if (tag == 'n') { /* OLD */ + unsigned int version; + if (rlen < 1+2+4+4) { + EI_TRACE_ERR1("recv_challenge","<- RECV_CHALLENGE 'n' packet too short (%d)", + rlen) + goto error; + } + + version = get16be(s); + if (version != EI_DIST_5) { + EI_TRACE_ERR1("recv_challenge", + "<- RECV_CHALLENGE 'n' incorrect version=%d", + version); + goto error; + } + *flags = get32be(s); + *challenge = get32be(s); + nodename_len = (buf + rlen) - s; + } + else { /* NEW */ + if (rlen < 1+8+4+4+2) { + EI_TRACE_ERR1("recv_challenge","<- RECV_CHALLENGE 'N' packet too short (%d)", + rlen) + goto error; + } + *version = EI_DIST_6; + *flags = get64be(s); + *challenge = get32be(s); + s += 4; /* ignore peer 'creation' */ + nodename_len = get16be(s); + if (nodename_len > (buf + rlen) - s) { + EI_TRACE_ERR1("recv_challenge", + "<- RECV_CHALLENGE 'N' nodename too long (%d)", + nodename_len); + goto error; + } + } + + if (nodename_len > MAXNODELEN) { + EI_TRACE_ERR1("recv_challenge", + "<- RECV_CHALLENGE nodename too long (%d)", nodename_len); + goto error; + } if (!(*flags & DFLAG_EXTENDED_REFERENCES)) { EI_TRACE_ERR0("recv_challenge","<- RECV_CHALLENGE peer cannot " @@ -1981,8 +2118,8 @@ static int recv_challenge(ei_socket_callbacks *cbs, void *ctx, if (!namebuf) namebuf = &tmp_nodename[0]; - memcpy(namebuf, s, rlen - 11); - namebuf[rlen - 11] = '\0'; + memcpy(namebuf, s, nodename_len); + namebuf[nodename_len] = '\0'; if (!is_static) free(buf); @@ -2003,6 +2140,63 @@ error: return -1; } +static int send_complement(ei_cnode *ec, + void *ctx, + int pkt_sz, + unsigned epmd_says_version, + DistFlags her_flags, + unsigned ms) +{ + if (epmd_says_version == EI_DIST_5 && (her_flags & DFLAG_HANDSHAKE_23)) { + char *buf; + unsigned char *s; + char dbuf[DEFBUF_SIZ]; + int err; + ssize_t len; + unsigned int flagsHigh; + const int siz = pkt_sz + 1 + 4 + 4; + + buf = (siz > DEFBUF_SIZ) ? malloc(siz) : dbuf; + if (!buf) { + erl_errno = ENOMEM; + return -1; + } + s = (unsigned char *)buf; + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } + flagsHigh = preferred_flags() >> 32; + + put8(s, 'c'); + put32be(s, flagsHigh); + put32be(s, ec->creation); + + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(ec->cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { + EI_TRACE_ERR0("send_name", "SEND_NAME -> socket write failed"); + if (buf != dbuf) + free(buf); + EI_CONN_SAVE_ERRNO__(err); + return -1; + } + + if (buf != dbuf) + free(buf); + } + return 0; +} + + static int send_challenge_reply(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, unsigned char digest[16], unsigned challenge, unsigned ms) @@ -2049,6 +2243,54 @@ static int send_challenge_reply(ei_socket_callbacks *cbs, void *ctx, return 0; } +static int recv_complement(ei_socket_callbacks *cbs, + void *ctx, + int pkt_sz, + unsigned ms) +{ + char dbuf[DEFBUF_SIZ]; + char *buf = dbuf; + int is_static = 1; + int buflen = DEFBUF_SIZ; + int rlen; + char *s; + char tag; + unsigned int creation; + + erl_errno = EIO; /* Default */ + + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, &buf, &buflen, &is_static, ms)) != 21) { + EI_TRACE_ERR1("recv_complement", + "<- RECV_COMPLEMENT socket read failed (%d)",rlen); + goto error; + } + + s = buf; + if ((tag = get8(s)) != 'c') { + EI_TRACE_ERR2("recv_complement", + "<- RECV_COMPLEMENT incorrect tag, " + "expected 'c' got '%c' (%u)",tag,tag); + goto error; + } + creation = get32be(s); + if (!is_static) + free(buf); + + if (ei_tracelevel >= 3) { + EI_TRACE_CONN1("recv_complement", + "<- RECV_COMPLEMENT (ok) creation = %u", + creation); + } + /* We don't have any use for 'creation' of other node, so we drop it */ + erl_errno = 0; + return 0; + +error: + if (!is_static) + free(buf); + return -1; +} + static int recv_challenge_reply(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, @@ -2204,30 +2446,16 @@ error: return -1; } -static int send_name(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, - char *nodename, unsigned version, unsigned ms) -{ - return send_name_or_challenge(cbs, ctx, pkt_sz, nodename, 0, - 0, version, ms); -} - -static int send_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, - char *nodename, unsigned challenge, unsigned version, - unsigned ms) -{ - return send_name_or_challenge(cbs, ctx, pkt_sz, nodename, 1, - challenge, version, ms); -} - static int recv_name(ei_socket_callbacks *cbs, void *ctx, - int pkt_sz, unsigned *version, - unsigned *flags, char *namebuf, unsigned ms) + int pkt_sz, char *send_name_tag, + DistFlags *flags, char *namebuf, unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; int is_static = 1; int buflen = DEFBUF_SIZ; int rlen; + unsigned int namelen; char *s; char tmp_nodename[MAXNODELEN+1]; char tag; @@ -2239,19 +2467,40 @@ static int recv_name(ei_socket_callbacks *cbs, void *ctx, EI_TRACE_ERR1("recv_name","<- RECV_NAME socket read failed (%d)",rlen); goto error; } - if ((rlen - 7) > MAXNODELEN) { - EI_TRACE_ERR1("recv_name","<- RECV_NAME nodename too long (%d)",rlen-7); - goto error; - } s = buf; tag = get8(s); - if (tag != 'n') { + *send_name_tag = tag; + if (tag != 'n' && tag != 'N') { EI_TRACE_ERR2("recv_name","<- RECV_NAME incorrect tag, " - "expected 'n' got '%c' (%u)",tag,tag); + "expected 'n' or 'N', got '%c' (%u)",tag,tag); goto error; } - *version = get16be(s); - *flags = get32be(s); + if (tag == 'n') { + unsigned int version; + if (rlen < 1+2+4) { + EI_TRACE_ERR1("recv_name","<- RECV_NAME 'n' packet too short (%d)", + rlen) + goto error; + } + version = get16be(s); + if (version < EI_DIST_5) { + EI_TRACE_ERR1("recv_name","<- RECV_NAME 'n' invalid version=%d", + version) + goto error; + } + *flags = get32be(s); + namelen = rlen - (1+2+4); + } + else { /* tag == 'N' */ + if (rlen < 1+8+4+2) { + EI_TRACE_ERR1("recv_name","<- RECV_NAME 'N' packet too short (%d)", + rlen) + goto error; + } + *flags = get64be(s); + s += 4; /* ignore peer 'creation' */ + namelen = get16be(s); + } if (!(*flags & DFLAG_EXTENDED_REFERENCES)) { EI_TRACE_ERR0("recv_name","<- RECV_NAME peer cannot handle" @@ -2269,14 +2518,20 @@ static int recv_name(ei_socket_callbacks *cbs, void *ctx, if (!namebuf) namebuf = &tmp_nodename[0]; - memcpy(namebuf, s, rlen - 7); - namebuf[rlen - 7] = '\0'; + if (namelen > MAXNODELEN || s+namelen > buf+rlen) { + EI_TRACE_ERR2("recv_name","<- RECV_NAME '%c' nodename too long (%d)", + tag, namelen); + goto error; + } + + memcpy(namebuf, s, namelen); + namebuf[namelen] = '\0'; if (!is_static) free(buf); EI_TRACE_CONN3("recv_name", - "<- RECV_NAME (ok) node = %s, version = %u, flags = %u", - namebuf,*version,*flags); + "<- RECV_NAME (ok) node = %s, tag = %c, flags = %u", + namebuf,tag,*flags); erl_errno = 0; return 0; diff --git a/lib/erl_interface/src/connect/ei_connect_int.h b/lib/erl_interface/src/connect/ei_connect_int.h index b41a5f2b23..428713e015 100644 --- a/lib/erl_interface/src/connect/ei_connect_int.h +++ b/lib/erl_interface/src/connect/ei_connect_int.h @@ -109,6 +109,8 @@ extern int h_errno; #define DFLAG_UTF8_ATOMS 0x10000 #define DFLAG_MAP_TAG 0x20000 #define DFLAG_BIG_CREATION 0x40000 +#define DFLAG_HANDSHAKE_23 0x1000000 +#define DFLAG_HANDSHAKE_XX 0xfe000000 /* bits reserved for handshake changes */ ei_cnode *ei_fd_to_cnode(int fd); int ei_distversion(int fd); diff --git a/lib/erl_interface/src/epmd/ei_epmd.h b/lib/erl_interface/src/epmd/ei_epmd.h index 597a955676..e3cb041dc9 100644 --- a/lib/erl_interface/src/epmd/ei_epmd.h +++ b/lib/erl_interface/src/epmd/ei_epmd.h @@ -24,9 +24,12 @@ #define INADDR_LOOPBACK ((u_long) 0x7F000001) #endif +#define EI_DIST_5 5 /* OTP R4 - 22 */ +#define EI_DIST_6 6 /* OTP 23 and later */ + #ifndef EI_DIST_HIGH -#define EI_DIST_HIGH 6 /* OTP 23 and later */ -#define EI_DIST_LOW 5 /* OTP R4 - 22 */ +#define EI_DIST_HIGH EI_DIST_6 +#define EI_DIST_LOW EI_DIST_5 #endif #ifndef EPMD_PORT diff --git a/lib/erl_interface/test/ei_accept_SUITE.erl b/lib/erl_interface/test/ei_accept_SUITE.erl index ceb1e401ff..c49b8a358a 100644 --- a/lib/erl_interface/test/ei_accept_SUITE.erl +++ b/lib/erl_interface/test/ei_accept_SUITE.erl @@ -75,7 +75,8 @@ ei_accept_do(Config, CompatRel, SockImpl) -> {ok, ListenFd} = ei_publish(P, Port), {any, EINode} ! TermToSend, - {ok, Fd, _Node} = ei_accept(P, ListenFd), + {ok, Fd, Node} = ei_accept(P, ListenFd), + Node = node(), Got1 = ei_receive(P, Fd), %% Send again, now without auto-connect diff --git a/lib/erl_interface/test/ei_tmo_SUITE.erl b/lib/erl_interface/test/ei_tmo_SUITE.erl index cbb3f2cf30..8d8776949c 100644 --- a/lib/erl_interface/test/ei_tmo_SUITE.erl +++ b/lib/erl_interface/test/ei_tmo_SUITE.erl @@ -106,6 +106,8 @@ do_one_recv_failure(Config,CNode) -> true = (Ret < 0), runner:recv_eot(P1). +-define(EI_DIST_LOW, 5). +-define(EI_DIST_HIGH, 6). %% Check send with timeouts. ei_send_tmo(Config) when is_list(Config) -> @@ -138,11 +140,15 @@ do_one_send(Config,From,CNode) -> ei_send_failure_tmo(Config) when is_list(Config) -> register(ei_send_tmo_1,self()), - do_one_send_failure(Config,self(),cccc1,c_nod_send_tmo_3), - do_one_send_failure(Config,ei_send_tmo_1,cccc2,c_nod_send_tmo_4), + [begin + io:format("Test dist version ~p\n", [Ver]), + do_one_send_failure(Config,self(),cccc1,c_nod_send_tmo_3, Ver), + do_one_send_failure(Config,ei_send_tmo_1,cccc2,c_nod_send_tmo_4, Ver) + end + || Ver <- lists:seq(?EI_DIST_LOW, ?EI_DIST_HIGH)], ok. -do_one_send_failure(Config,From,FakeName,CName) -> +do_one_send_failure(Config,From,FakeName,CName, OurVer) -> {_,Host} = split(node()), OurName = join(FakeName,Host), Node = join(CName,Host), @@ -152,7 +158,7 @@ do_one_send_failure(Config,From,FakeName,CName) -> Else -> exit(Else) end, - EpmdSocket = register(OurName, LSocket, 1, 5), + EpmdSocket = epmd_register(OurName, LSocket, OurVer), P3 = runner:start(Config, ?send_tmo), Cookie = kaksmula_som_ingen_bryr_sig_om, runner:send_term(P3,{CName, @@ -165,10 +171,10 @@ do_one_send_failure(Config,From,FakeName,CName) -> Else2 -> exit(Else2) end, - {hidden,Node,5} = recv_name(SocketB), % See 1) + {hidden,Node} = recv_name(SocketB, OurVer), % See 1) send_status(SocketB, ok), MyChallengeB = gen_challenge(), - send_challenge(SocketB, OurName, MyChallengeB, 5), + send_challenge(SocketB, OurName, MyChallengeB, OurVer), HisChallengeB = recv_challenge_reply(SocketB, MyChallengeB, Cookie), @@ -214,6 +220,15 @@ ei_connect_unreachable_tmo(Config) when is_list(Config) -> ok. ei_connect_tmo(Config) when is_list(Config) -> + [begin + io:format("Test dist version ~p published as ~p\n", [OurVer,OurEpmdVer]), + do_ei_connect_tmo(Config, OurVer, OurEpmdVer) + end + || OurVer <- lists:seq(?EI_DIST_LOW, ?EI_DIST_HIGH), + OurEpmdVer <- lists:seq(?EI_DIST_LOW, ?EI_DIST_HIGH), + OurVer >= OurEpmdVer]. + +do_ei_connect_tmo(Config, OurVer, OurEpmdVer) -> P2 = runner:start(Config, ?connect_tmo), runner:send_term(P2,{c_nod_connect_tmo_2, erlang:get_cookie(), @@ -232,7 +247,7 @@ ei_connect_tmo(Config) when is_list(Config) -> Else -> exit(Else) end, - EpmdSocket = register(OurName, LSocket, 1, 5), + EpmdSocket = epmd_register(OurName, LSocket, OurEpmdVer), P3 = runner:start(Config, ?connect_tmo), Cookie = kaksmula_som_ingen_bryr_sig_om, runner:send_term(P3,{c_nod_connect_tmo_3, @@ -245,10 +260,11 @@ ei_connect_tmo(Config) when is_list(Config) -> Else2 -> exit(Else2) end, - {hidden,Node,5} = recv_name(SocketB), % See 1) + {hidden,Node} = recv_name(SocketB, OurEpmdVer), % See 1) send_status(SocketB, ok), MyChallengeB = gen_challenge(), - send_challenge(SocketB, OurName, MyChallengeB, 5), + send_challenge(SocketB, OurName, MyChallengeB, OurVer), + recv_complement(SocketB, OurVer, OurEpmdVer), _HisChallengeB = recv_challenge_reply(SocketB, MyChallengeB, Cookie), @@ -261,8 +277,17 @@ ei_connect_tmo(Config) when is_list(Config) -> %% Check accept with timeouts. ei_accept_tmo(Config) when is_list(Config) -> - %%dbg:tracer(), - %%dbg:p(self()), + [begin + io:format("Test our dist ver=~p and assumed ver=~p\n", + [OurVer, AssumedVer]), + do_ei_accept_tmo(Config, OurVer, AssumedVer) + end + || OurVer <- lists:seq(?EI_DIST_LOW, ?EI_DIST_HIGH), + AssumedVer <- lists:seq(?EI_DIST_LOW, ?EI_DIST_HIGH), + OurVer >= AssumedVer], + ok. + +do_ei_accept_tmo(Config, OurVer, AssumedVer) -> P = runner:start(Config, ?accept_tmo), runner:send_term(P,{c_nod_som_ingen_kontaktar_1, kaksmula_som_ingen_bryr_sig_om, @@ -288,13 +313,13 @@ ei_accept_tmo(Config) when is_list(Config) -> {NA,NB} = split(CNode2), {_,Host} = split(node()), OurName = join(ccc,Host), - {port,PortNo,_} = erl_epmd:port_please(NA,NB), + {port,PortNo,?EI_DIST_HIGH} = erl_epmd:port_please(NA,NB), {ok, SocketA} = gen_tcp:connect(atom_to_list(NB),PortNo, [{active,false}, {packet,2}]), - send_name(SocketA,OurName,5), + send_name(SocketA,OurName,OurVer,AssumedVer), ok = recv_status(SocketA), - {hidden,_Node,5,HisChallengeA} = recv_challenge(SocketA), % See 1) + {hidden,_Node,HisChallengeA} = recv_challenge(SocketA,OurVer), % See 1) _OurChallengeA = gen_challenge(), _OurDigestA = gen_digest(HisChallengeA, erlang:get_cookie()), %% Dont do the last two steps of the connection setup... @@ -340,6 +365,7 @@ make_and_check_dummy() -> -define(DFLAG_EXTENDED_PIDS_PORTS,16#100). -define(DFLAG_NEW_FLOATS,16#800). -define(DFLAG_DIST_MONITOR,8). +-define(DFLAG_HANDSHAKE_23,16#1000000). %% From R9 and forward extended references is compulsory %% From 14 and forward new float is compulsory @@ -403,31 +429,61 @@ recv_status(Socket) -> exit(Bad) end. -send_challenge(Socket, Node, Challenge, Version) -> - send_challenge(Socket, Node, Challenge, Version, ?COMPULSORY_DFLAGS). -send_challenge(Socket, Node, Challenge, Version, Flags) -> - {ok, {{_Ip1,_Ip2,_Ip3,_Ip4}, _}} = inet:sockname(Socket), - ?to_port(Socket, [$n,?int16(Version),?int32(Flags), - ?int32(Challenge), atom_to_list(Node)]). - -recv_challenge(Socket) -> +send_challenge(Socket, Node, Challenge, OurVer) -> + send_challenge(Socket, Node, Challenge, OurVer, ?COMPULSORY_DFLAGS). + +send_challenge(Socket, Node, Challenge, OurVer, Flags) -> + if OurVer =:= 5 -> + ?to_port(Socket, [$n, ?int16(OurVer), ?int32(Flags), + ?int32(Challenge), atom_to_list(Node)]); + OurVer >= 6 -> + NodeName = atom_to_binary(Node, latin1), + NameLen = byte_size(NodeName), + Creation = erts_internal:get_creation(), + ?to_port(Socket, [$N, + <<(Flags bor ?DFLAG_HANDSHAKE_23):64, + Challenge:32, + Creation:32, + NameLen:16>>, + NodeName]) + end. + +recv_challenge(Socket, OurVer) -> case gen_tcp:recv(Socket, 0) of {ok,[$n,V1,V0,Fl1,Fl2,Fl3,Fl4,CA3,CA2,CA1,CA0 | Ns]} -> + 5 = OurVer, Flags = ?u32(Fl1,Fl2,Fl3,Fl4), - Type = case Flags band ?DFLAG_PUBLISHED of - 0 -> - hidden; - _ -> - normal - end, + Type = flags_to_type(Flags), Node =list_to_atom(Ns), - Version = ?u16(V1,V0), + OurVer = ?u16(V1,V0), % echoed back + Challenge = ?u32(CA3,CA2,CA1,CA0), + {Type,Node,Challenge}; + + {ok,[$N, F7,F6,F5,F4,F3,F2,F1,F0, CA3,CA2,CA1,CA0, + _Cr3,_Cr2,_Cr1,_Cr0, NL1,NL0 | Rest]} -> + true = (OurVer >= 6), + <<Flags:64>> = <<F7,F6,F5,F4,F3,F2,F1,F0>>, + Type = flags_to_type(Flags), + NameLen = ?u16(NL1,NL0), + {NodeName,_} = lists:split(NameLen, Rest), + Node = list_to_atom(NodeName), Challenge = ?u32(CA3,CA2,CA1,CA0), - {Type,Node,Version,Challenge}; + %%Creation = ?u32(Cr3,Cr2,Cr1,Cr0), + %%true = (Creation =/= 0), + {Type,Node,Challenge}; + _ -> ?shutdown(no_node) end. +flags_to_type(Flags) -> + case Flags band ?DFLAG_PUBLISHED of + 0 -> + hidden; + _ -> + normal + end. + %send_challenge_reply(Socket, Challenge, Digest) -> % ?to_port(Socket, [$r,?int32(Challenge),Digest]). @@ -441,8 +497,8 @@ recv_challenge_reply(Socket, ChallengeA, Cookie) -> true -> ?shutdown(bad_challenge_reply) end; - _ -> - ?shutdown(no_node) + Other -> + ?shutdown({recv_challenge_reply,Other}) end. send_challenge_ack(Socket, Digest) -> @@ -461,37 +517,53 @@ send_challenge_ack(Socket, Digest) -> % ?shutdown(bad_challenge_ack) % end. -send_name(Socket, MyNode0, Version) -> - send_name(Socket, MyNode0, Version, ?COMPULSORY_DFLAGS). -send_name(Socket, MyNode0, Version, Flags) -> - MyNode = atom_to_list(MyNode0), - ?to_port(Socket, [$n,?int16(Version),?int32(Flags)] ++ - MyNode). +send_name(Socket, MyNode, OurVer, AssumedVer) -> + Flags = ?COMPULSORY_DFLAGS bor (case OurVer of + 5 -> 0; + 6 -> ?DFLAG_HANDSHAKE_23 + end), + send_name(Socket, MyNode, OurVer, AssumedVer, Flags). + +send_name(Socket, MyNode, OurVer, AssumedVer, Flags) -> + NodeName = atom_to_binary(MyNode, latin1), + if AssumedVer =:= 5 -> + ?to_port(Socket, [$n,?int16(OurVer),?int32(Flags),NodeName]); + AssumedVer >= 6 -> + Creation = erts_internal:get_creation(), + ?to_port(Socket, [$N, + <<Flags:64, + Creation:32, + (byte_size(NodeName)):16>>, + NodeName]) + end. -%% -%% recv_name is common for both old and new handshake. -%% -recv_name(Socket) -> +recv_name(Socket, OurEpmdVer) -> case gen_tcp:recv(Socket, 0) of - {ok,Data} -> - get_name(Data); + {ok,[$n, V1,V0, F3,F2,F1,F0 | OtherNode]} -> + 5 = OurEpmdVer, + 5 = ?u16(V1,V0), + Type = flags_to_type(?u32(F3,F2,F1,F0)), + {Type, list_to_atom(OtherNode)}; + {ok,[$N, F7,F6,F5,F4,F3,F2,F1,F0, _Cr3,_Cr2,_Cr1,_Cr0, NL1, NL0 | Rest]} -> + true = (OurEpmdVer >= 6), + {OtherNode, _Residue} = lists:split(?u16(NL1,NL0), Rest), + <<Flags:64>> = <<F7,F6,F5,F4,F3,F2,F1,F0>>, + Type = flags_to_type(Flags), + {Type, list_to_atom(OtherNode)}; Res -> ?shutdown({no_node,Res}) end. -get_name([$m,VersionA,VersionB,_Ip1,_Ip2,_Ip3,_Ip4|OtherNode]) -> - {normal, list_to_atom(OtherNode), ?u16(VersionA,VersionB)}; -get_name([$h,VersionA,VersionB,_Ip1,_Ip2,_Ip3,_Ip4|OtherNode]) -> - {hidden, list_to_atom(OtherNode), ?u16(VersionA,VersionB)}; -get_name([$n,VersionA, VersionB, Flag1, Flag2, Flag3, Flag4 | OtherNode]) -> - Type = case ?u32(Flag1, Flag2, Flag3, Flag4) band ?DFLAG_PUBLISHED of - 0 -> hidden; - _ -> normal - end, - {Type, list_to_atom(OtherNode), - ?u16(VersionA,VersionB)}; -get_name(Data) -> - ?shutdown(Data). +recv_complement(Socket, OurVer, 5) when OurVer > 5 -> + case gen_tcp:recv(Socket, 0) of + {ok,[$c, _F7,_F6,_F5,_F4, _Cr3,_Cr2,_Cr1,_Cr0]} -> + ok; + Res -> + ?shutdown({no_node,Res}) + end; +recv_complement(_, _OurVer, _OurEpmdVer) -> + ok. + %% %% tell_name is for old handshake @@ -536,13 +608,10 @@ wait_for_reg_reply(Socket, SoFar) -> receive {tcp, Socket, Data0} -> case SoFar ++ Data0 of - [$y, Result, A, B] -> - case Result of - 0 -> - {alive, Socket, ?u16(A, B)}; - _ -> - {error, duplicate_name} - end; + [$y, 0, Cr1,Cr0] -> + {alive, Socket, ?u16(Cr1,Cr0)}; + [$v, 0, Cr3,Cr2,Cr1,Cr0] -> + {alive, Socket, ?u32(Cr3,Cr2,Cr1,Cr0)}; Data when length(Data) < 4 -> wait_for_reg_reply(Socket, Data); Garbage -> @@ -556,9 +625,9 @@ wait_for_reg_reply(Socket, SoFar) -> end. -register(NodeName, ListenSocket, VLow, VHigh) -> +epmd_register(NodeName, ListenSocket, OurVer) -> {ok,{_,TcpPort}} = inet:sockname(ListenSocket), - case do_register_node(NodeName, TcpPort, VLow, VHigh) of + case do_register_node(NodeName, TcpPort, ?EI_DIST_LOW, OurVer) of {alive, Socket, _Creation} -> Socket; Other -> diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java index 0bf3ca2a67..26f6ffcd97 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java @@ -147,21 +147,6 @@ public abstract class AbstractConnection extends Thread { if (traceLevel >= handshakeThreshold) { System.out.println("<- ACCEPT FROM " + s); } - - // get his info - recvName(peer); - - // now find highest common dist value - if (peer.proto != self.proto || self.distHigh < peer.distLow - || self.distLow > peer.distHigh) { - close(); - throw new IOException( - "No common protocol found - cannot accept connection"); - } - // highest common version: min(peer.distHigh, self.distHigh) - peer.distChoose = peer.distHigh > self.distHigh ? self.distHigh - : peer.distHigh; - doAccept(); name = peer.node(); } @@ -953,10 +938,12 @@ public abstract class AbstractConnection extends Thread { } protected void doAccept() throws IOException, OtpAuthException { + final int send_name_tag = recvName(peer); try { sendStatus("ok"); final int our_challenge = genChallenge(); - sendChallenge(peer.distChoose, localNode.flags, our_challenge); + sendChallenge(peer.flags, localNode.flags, our_challenge); + recvComplement(send_name_tag); final int her_challenge = recvChallengeReply(our_challenge); final byte[] our_digest = genDigest(her_challenge, localNode.cookie()); @@ -992,12 +979,14 @@ public abstract class AbstractConnection extends Thread { System.out.println("-> MD5 CONNECT TO " + peer.host() + ":" + port); } - sendName(peer.distChoose, localNode.flags); + final int send_name_tag = sendName(peer.distChoose, localNode.flags, + localNode.creation); recvStatus(); final int her_challenge = recvChallenge(); final byte[] our_digest = genDigest(her_challenge, localNode.cookie()); final int our_challenge = genChallenge(); + sendComplement(send_name_tag); sendChallengeReply(our_challenge, our_digest); recvChallengeAck(our_challenge); cookieOk = true; @@ -1070,17 +1059,31 @@ public abstract class AbstractConnection extends Thread { return res; } - protected void sendName(final int dist, final int aflags) + protected int sendName(final int dist, final long aflags, + final int creation) throws IOException { @SuppressWarnings("resource") final OtpOutputStream obuf = new OtpOutputStream(); final String str = localNode.node(); - obuf.write2BE(str.length() + 7); // 7 bytes + nodename - obuf.write1(AbstractNode.NTYPE_R6); - obuf.write2BE(dist); - obuf.write4BE(aflags); - obuf.write(str.getBytes()); + int send_name_tag; + if (dist == 5) { + obuf.write2BE(1+2+4 + str.length()); + send_name_tag = 'n'; + obuf.write1(send_name_tag); + obuf.write2BE(dist); + obuf.write4BE(aflags); + obuf.write(str.getBytes()); + } + else { + obuf.write2BE(1+8+4+2 + str.length()); + send_name_tag = 'N'; + obuf.write1(send_name_tag); + obuf.write8BE(aflags); + obuf.write4BE(creation); + obuf.write2BE(str.length()); + obuf.write(str.getBytes()); + } obuf.writeToAndFlush(socket.getOutputStream()); @@ -1088,26 +1091,61 @@ public abstract class AbstractConnection extends Thread { System.out.println("-> " + "HANDSHAKE sendName" + " flags=" + aflags + " dist=" + dist + " local=" + localNode); } + return send_name_tag; } - protected void sendChallenge(final int dist, final int aflags, - final int challenge) throws IOException { + protected void sendComplement(final int send_name_tag) + throws IOException { + + if (send_name_tag == 'n' && + (peer.flags & AbstractNode.dFlagHandshake23) != 0) { + @SuppressWarnings("resource") + final OtpOutputStream obuf = new OtpOutputStream(); + obuf.write2BE(1+4+4); + obuf.write1('c'); + final int flagsHigh = (int)(localNode.flags >> 32); + obuf.write4BE(flagsHigh); + obuf.write4BE(localNode.creation); + + obuf.writeToAndFlush(socket.getOutputStream()); + + if (traceLevel >= handshakeThreshold) { + System.out.println("-> " + "HANDSHAKE sendComplement" + + " flagsHigh=" + flagsHigh + + " creation=" + localNode.creation); + } + } + } + + protected void sendChallenge(final long her_flags, final long our_flags, + final int challenge) throws IOException { @SuppressWarnings("resource") final OtpOutputStream obuf = new OtpOutputStream(); final String str = localNode.node(); - obuf.write2BE(str.length() + 11); // 11 bytes + nodename - obuf.write1(AbstractNode.NTYPE_R6); - obuf.write2BE(dist); - obuf.write4BE(aflags); - obuf.write4BE(challenge); - obuf.write(str.getBytes()); + if ((her_flags & AbstractNode.dFlagHandshake23) == 0) { + obuf.write2BE(1+2+4+4 + str.length()); + obuf.write1('n'); + obuf.write2BE(5); + obuf.write4BE(our_flags & 0xffffffff); + obuf.write4BE(challenge); + obuf.write(str.getBytes()); + } + else { + obuf.write2BE(1+8+4+4+2 + str.length()); + obuf.write1('N'); + obuf.write8BE(our_flags); + obuf.write4BE(challenge); + obuf.write4BE(localNode.creation); + obuf.write2BE(str.length()); + obuf.write(str.getBytes()); + } obuf.writeToAndFlush(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendChallenge" + " flags=" - + aflags + " dist=" + dist + " challenge=" + challenge + + our_flags + " challenge=" + challenge + " local=" + localNode); } } @@ -1127,8 +1165,8 @@ public abstract class AbstractConnection extends Thread { return tmpbuf; } - protected void recvName(final OtpPeer apeer) throws IOException { - + protected int recvName(final OtpPeer apeer) throws IOException { + int send_name_tag; String hisname = ""; try { @@ -1137,25 +1175,31 @@ public abstract class AbstractConnection extends Thread { final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0); byte[] tmpname; final int len = tmpbuf.length; - apeer.ntype = ibuf.read1(); - if (apeer.ntype != AbstractNode.NTYPE_R6) { + send_name_tag = ibuf.read1(); + switch (send_name_tag) { + case 'n': + apeer.distLow = apeer.distHigh = ibuf.read2BE(); + if (apeer.distLow != 5) + throw new IOException("Invalid handshake version"); + apeer.flags = ibuf.read4BE(); + tmpname = new byte[len - 7]; + ibuf.readN(tmpname); + hisname = OtpErlangString.newString(tmpname); + break; + case 'N': + apeer.distLow = apeer.distHigh = 6; + apeer.flags = ibuf.read8BE(); + if ((apeer.flags & AbstractNode.dFlagHandshake23) == 0) + throw new IOException("Missing DFLAG_HANDSHAKE_23"); + apeer.creation = ibuf.read4BE(); + int namelen = ibuf.read2BE(); + tmpname = new byte[namelen]; + ibuf.readN(tmpname); + hisname = OtpErlangString.newString(tmpname); + break; + default: throw new IOException("Unknown remote node type"); } - apeer.distLow = apeer.distHigh = ibuf.read2BE(); - if (apeer.distLow < 5) { - throw new IOException("Unknown remote node type"); - } - apeer.flags = ibuf.read4BE(); - tmpname = new byte[len - 7]; - ibuf.readN(tmpname); - hisname = OtpErlangString.newString(tmpname); - // Set the old nodetype parameter to indicate hidden/normal status - // When the old handshake is removed, the ntype should also be. - if ((apeer.flags & AbstractNode.dFlagPublished) != 0) { - apeer.ntype = AbstractNode.NTYPE_R4_ERLANG; - } else { - apeer.ntype = AbstractNode.NTYPE_R4_HIDDEN; - } if ((apeer.flags & AbstractNode.dFlagExtendedReferences) == 0) { throw new IOException( @@ -1180,6 +1224,7 @@ public abstract class AbstractConnection extends Thread { System.out.println("<- " + "HANDSHAKE" + " ntype=" + apeer.ntype + " dist=" + apeer.distHigh + " remote=" + apeer); } + return send_name_tag; } protected int recvChallenge() throws IOException { @@ -1190,14 +1235,31 @@ public abstract class AbstractConnection extends Thread { final byte[] buf = read2BytePackage(); @SuppressWarnings("resource") final OtpInputStream ibuf = new OtpInputStream(buf, 0); - peer.ntype = ibuf.read1(); - if (peer.ntype != AbstractNode.NTYPE_R6) { + int namelen; + switch (ibuf.read1()) { + case 'n': + if (peer.distChoose != 5) + throw new IOException("Old challenge wrong version"); + peer.distLow = peer.distHigh = ibuf.read2BE(); + peer.flags = ibuf.read4BE(); + if ((peer.flags & AbstractNode.dFlagHandshake23) != 0) + throw new IOException("Old challenge unexpected DFLAG_HANDHAKE_23"); + challenge = ibuf.read4BE(); + namelen = buf.length - (1+2+4+4); + break; + case 'N': + peer.distLow = peer.distHigh = peer.distChoose = 6; + peer.flags = ibuf.read8BE(); + if ((peer.flags & AbstractNode.dFlagHandshake23) == 0) + throw new IOException("New challenge missing DFLAG_HANDHAKE_23"); + challenge = ibuf.read4BE(); + peer.creation = ibuf.read4BE(); + namelen = ibuf.read2BE(); + break; + default: throw new IOException("Unexpected peer type"); } - peer.distLow = peer.distHigh = ibuf.read2BE(); - peer.flags = ibuf.read4BE(); - challenge = ibuf.read4BE(); - final byte[] tmpname = new byte[buf.length - 11]; + final byte[] tmpname = new byte[namelen]; ibuf.readN(tmpname); final String hisname = OtpErlangString.newString(tmpname); if (!hisname.equals(peer.node)) { @@ -1228,6 +1290,27 @@ public abstract class AbstractConnection extends Thread { return challenge; } + protected void recvComplement(int send_name_tag) throws IOException { + + if (send_name_tag == 'n' && + (peer.flags & AbstractNode.dFlagHandshake23) != 0) { + try { + final byte[] tmpbuf = read2BytePackage(); + @SuppressWarnings("resource") + final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0); + if (ibuf.read1() != 'c') + throw new IOException("Not a complement tag"); + + final long flagsHigh = ibuf.read4BE(); + peer.flags |= flagsHigh << 32; + peer.creation = ibuf.read4BE(); + + } catch (final OtpErlangDecodeException e) { + throw new IOException("Handshake failed - not enough data"); + } + } + } + protected void sendChallengeReply(final int challenge, final byte[] digest) throws IOException { diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java index c3f71a84f0..fa6db9a046 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java @@ -74,10 +74,7 @@ public class AbstractNode implements OtpTransportFactory { static String defaultCookie = null; final OtpTransportFactory transportFactory; - // Node types static final int NTYPE_R6 = 110; // 'n' post-r5, all nodes - static final int NTYPE_R4_ERLANG = 109; // 'm' Only for source compatibility - static final int NTYPE_R4_HIDDEN = 104; // 'h' Only for source compatibility // Node capability flags static final int dFlagPublished = 1; @@ -96,17 +93,19 @@ public class AbstractNode implements OtpTransportFactory { static final int dFlagUtf8Atoms = 0x10000; static final int dFlagMapTag = 0x20000; static final int dFlagBigCreation = 0x40000; + static final int dFlagHandshake23 = 0x1000000; int ntype = NTYPE_R6; int proto = 0; // tcp/ip - int distHigh = 5; // Cannot talk to nodes before R6 + int distHigh = 6; int distLow = 5; // Cannot talk to nodes before R6 int creation = 0; - int flags = dFlagExtendedReferences | dFlagExtendedPidsPorts + long flags = dFlagExtendedReferences | dFlagExtendedPidsPorts | dFlagBitBinaries | dFlagNewFloats | dFlagFunTags | dflagNewFunTags | dFlagUtf8Atoms | dFlagMapTag | dFlagExportPtrTag - | dFlagBigCreation; + | dFlagBigCreation + | dFlagHandshake23; /* initialize hostname and default cookie */ static { diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java index fffb8475d3..008ee9727e 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java @@ -74,8 +74,9 @@ public class OtpEpmd { private static final byte port4req = (byte) 122; private static final byte port4resp = (byte) 119; - private static final byte publish4req = (byte) 120; - private static final byte publish4resp = (byte) 121; + private static final byte ALIVE2_REQ = (byte) 120; + private static final byte ALIVE2_RESP = (byte) 121; + private static final byte ALIVE2_X_RESP = (byte) 118; private static final byte names4req = (byte) 110; private static int traceLevel = 0; @@ -287,7 +288,7 @@ public class OtpEpmd { obuf.write2BE(node.alive().length() + 13); - obuf.write1(publish4req); + obuf.write1(ALIVE2_REQ); obuf.write2BE(node.port()); obuf.write1(node.type()); @@ -322,10 +323,11 @@ public class OtpEpmd { final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0); final int response = ibuf.read1(); - if (response == publish4resp) { + if (response == ALIVE2_RESP || response == ALIVE2_X_RESP) { final int result = ibuf.read1(); if (result == 0) { - node.creation = ibuf.read2BE(); + node.creation = (response == ALIVE2_RESP + ? ibuf.read2BE() : ibuf.read4BE()); if (traceLevel >= traceThreshold) { System.out.println("<- OK"); } diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java index 6d81ce630b..8cc5b3c21d 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java @@ -239,6 +239,20 @@ public class OtpInputStream extends ByteArrayInputStream { } /** + * Read a eight byte big endian integer from the stream. + * + * @return the bytes read, converted from big endian to a long integer. + * + * @exception OtpErlangDecodeException + * if the next byte cannot be read. + */ + public long read8BE() throws OtpErlangDecodeException { + long high = read4BE(); + long low = read4BE(); + return (high << 32) | (low & 0xffffffff); + } + + /** * Read a two byte little endian integer from the stream. * * @return the bytes read, converted from little endian to an integer. diff --git a/lib/kernel/include/dist.hrl b/lib/kernel/include/dist.hrl index e3b06ddee7..3cc825fca6 100644 --- a/lib/kernel/include/dist.hrl +++ b/lib/kernel/include/dist.hrl @@ -44,8 +44,18 @@ -define(DFLAG_BIG_SEQTRACE_LABELS, 16#100000). %% -define(DFLAG_NO_MAGIC, 16#200000). %% Used internally only -define(DFLAG_EXIT_PAYLOAD, 16#400000). --define(DFLAG_FRAGMENTS, 16#800000). --define(DFLAG_SPAWN, 16#1000000). +-define(DFLAG_FRAGMENTS, 16#00800000). +-define(DFLAG_HANDSHAKE_23, 16#01000000). +-define(DFLAG_RESERVED, 16#fe000000). +-define(DFLAG_SPAWN, 16#100000000). %% Also update dflag2str() in ../src/dist_util.erl %% when adding flags... + + +-define(ERL_DIST_VER_5, 5). % OTP-22 or (much) older +-define(ERL_DIST_VER_6, 6). % OTP-23 (or maybe newer?) + +-define(ERL_DIST_VER_LOW, ?ERL_DIST_VER_5). +-define(ERL_DIST_VER_HIGH, ?ERL_DIST_VER_6). + diff --git a/lib/kernel/include/dist_util.hrl b/lib/kernel/include/dist_util.hrl index 56f775f060..05c7eee795 100644 --- a/lib/kernel/include/dist_util.hrl +++ b/lib/kernel/include/dist_util.hrl @@ -84,7 +84,10 @@ f_handshake_complete, %% Notify handshake complete add_flags, %% dflags to add reject_flags, %% dflags not to use (not all can be rejected) - require_flags %% dflags that are required + require_flags, %% dflags that are required + + %% New in kernel-@master@ (OTP-23.0) + other_creation }). diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl index 9efdf7ed70..9495ac282a 100644 --- a/lib/kernel/src/dist_util.erl +++ b/lib/kernel/src/dist_util.erl @@ -70,6 +70,8 @@ -define(u32(X3,X2,X1,X0), (((X3) bsl 24) bor ((X2) bsl 16) bor ((X1) bsl 8) bor (X0))). +-define(CREATION_UNKNOWN,0). + -record(tick, {read = 0, write = 0, tick = 0, @@ -120,6 +122,8 @@ dflag2str(?DFLAG_EXIT_PAYLOAD) -> "EXIT_PAYLOAD"; dflag2str(?DFLAG_FRAGMENTS) -> "FRAGMENTS"; +dflag2str(?DFLAG_HANDSHAKE_23) -> + "HANDSHAKE_23"; dflag2str(?DFLAG_SPAWN) -> "SPAWN"; dflag2str(_) -> @@ -181,30 +185,35 @@ handshake_other_started(#hs_data{request_type=ReqType, AddFlgs = convert_flags(AddFlgs0), RejFlgs = convert_flags(RejFlgs0), ReqFlgs = convert_flags(ReqFlgs0), - {PreOtherFlags,Node,Version} = recv_name(HSData0), + {PreOtherFlags,Node,Creation,SendNameVersion} = recv_name(HSData0), EDF = erts_internal:get_dflags(), PreThisFlags = make_this_flags(ReqType, AddFlgs, RejFlgs, Node, EDF), - ChosenFlags = adjust_flags(PreThisFlags, PreOtherFlags), - HSData = HSData0#hs_data{this_flags=ChosenFlags, - other_flags=ChosenFlags, - other_version=Version, - other_node=Node, - other_started=true, - add_flags=AddFlgs, - reject_flags=RejFlgs, - require_flags=ReqFlgs}, - check_dflags(HSData, EDF), - ?debug({"MD5 connection from ~p (V~p)~n", - [Node, HSData#hs_data.other_version]}), - mark_pending(HSData), + HSData1 = HSData0#hs_data{this_flags=PreThisFlags, + other_flags=PreOtherFlags, + other_version=flags_to_version(PreOtherFlags), + other_node=Node, + other_started=true, + other_creation=Creation, + add_flags=AddFlgs, + reject_flags=RejFlgs, + require_flags=ReqFlgs}, + check_dflags(HSData1, EDF), + ?debug({"MD5 connection from ~p~n", [Node]}), + mark_pending(HSData1), {MyCookie,HisCookie} = get_cookies(Node), ChallengeA = gen_challenge(), - send_challenge(HSData, ChallengeA), - reset_timer(HSData#hs_data.timer), - ChallengeB = recv_challenge_reply(HSData, ChallengeA, MyCookie), - send_challenge_ack(HSData, gen_digest(ChallengeB, HisCookie)), + send_challenge(HSData1, ChallengeA), + reset_timer(HSData1#hs_data.timer), + HSData2 = recv_complement(HSData1, SendNameVersion), + check_dflags(HSData2, EDF), + ChosenFlags = adjust_flags(HSData2#hs_data.this_flags, + HSData2#hs_data.other_flags), + HSData3 = HSData2#hs_data{this_flags = ChosenFlags, + other_flags = ChosenFlags}, + ChallengeB = recv_challenge_reply(HSData3, ChallengeA, MyCookie), + send_challenge_ack(HSData3, gen_digest(ChallengeB, HisCookie)), ?debug({dist_util, self(), accept_connection, Node}), - connection(HSData); + connection(HSData3); handshake_other_started(OldHsData) when element(1,OldHsData) =:= hs_data -> handshake_other_started(convert_old_hsdata(OldHsData)). @@ -381,16 +390,19 @@ handshake_we_started(#hs_data{request_type=ReqType, add_flags = AddFlgs, reject_flags = RejFlgs, require_flags = ReqFlgs}, - send_name(HSData), + SendNameVersion = send_name(HSData), recv_status(HSData), - {PreOtherFlags,ChallengeA} = recv_challenge(HSData), + {PreOtherFlags, ChallengeA, Creation} = recv_challenge(HSData), ChosenFlags = adjust_flags(PreThisFlags, PreOtherFlags), NewHSData = HSData#hs_data{this_flags = ChosenFlags, other_flags = ChosenFlags, - other_started = false}, + other_started = false, + other_version = flags_to_version(PreOtherFlags), + other_creation = Creation}, check_dflags(NewHSData, EDF), MyChallenge = gen_challenge(), {MyCookie,HisCookie} = get_cookies(Node), + send_complement(NewHSData, SendNameVersion), send_challenge_reply(NewHSData,MyChallenge, gen_digest(ChallengeA,HisCookie)), reset_timer(NewHSData#hs_data.timer), @@ -411,6 +423,16 @@ convert_flags(Flags) when is_integer(Flags) -> convert_flags(_Undefined) -> 0. +flags_to_version(Flags) -> + case Flags band ?DFLAG_HANDSHAKE_23 of + 0 -> + ?ERL_DIST_VER_5; + ?DFLAG_HANDSHAKE_23 -> + ?ERL_DIST_VER_6 + end. + + + %% -------------------------------------------------------------- %% The connection has been established. %% -------------------------------------------------------------- @@ -488,15 +510,14 @@ get_cookies(Node) -> %% No error return; either succeeds or terminates the process. do_setnode(#hs_data{other_node = Node, socket = Socket, other_flags = Flags, other_version = Version, - f_getll = GetLL}) -> + f_getll = GetLL, + other_creation = Creation}) -> case GetLL(Socket) of {ok,Port} -> - ?trace("setnode(md5,~p ~p ~p)~n", - [Node, Port, {publish_type(Flags), - '(', Flags, ')', - Version}]), + ?trace("setnode: node=~p port=~p flags=~p(~p) ver=~p creation=~p~n", + [Node, Port, Flags, publish_type(Flags), Version, Creation]), try - erlang:setnode(Node, Port, {Flags, Version, '', ''}) + erlang:setnode(Node, Port, {Flags, Version, Creation}) catch error:system_limit -> error_msg("** Distribution system limit reached, " @@ -603,21 +624,77 @@ send_name(#hs_data{socket = Socket, this_node = Node, f_send = FSend, this_flags = Flags, other_version = Version}) -> - ?trace("send_name: node=~w, version=~w\n", - [Node,Version]), - ?to_port(FSend, Socket, - [$n, ?int16(Version), ?int32(Flags), atom_to_list(Node)]). + NameBin = atom_to_binary(Node, latin1), + if Version =:= undefined; + Version =:= ?ERL_DIST_VER_5 -> + %% We treat "5" the same as 'undefined' as there are + %% custom made epmd modules out there with a hardcoded "5". + %% + %% Send old 'n' message but with DFLAG_HANDSHAKE_23 + %% Old nodes will ignore DFLAG_HANDSHAKE_23 and reply old 'n' challenge. + %% New nodes will see DFLAG_HANDSHAKE_23 and reply new 'N' challenge. + ?trace("send_name: 'n' node=~p, version=~w\n", + [Node, ?ERL_DIST_VER_5]), + _ = ?to_port(FSend, Socket, + [<<$n, ?ERL_DIST_VER_5:16, Flags:32>>, NameBin]), + ?ERL_DIST_VER_5; + + is_integer(Version), Version >= ?ERL_DIST_VER_6 -> + Creation = erts_internal:get_creation(), + NameLen = byte_size(NameBin), + ?trace("send_name: 'N' node=~p creation=~w\n", + [Node, Creation]), + _ = ?to_port(FSend, Socket, + [<<$N, Flags:64, Creation:32, NameLen:16>>, NameBin]), + ?ERL_DIST_VER_6 + end. send_challenge(#hs_data{socket = Socket, this_node = Node, - other_version = Version, - this_flags = Flags, + this_flags = ThisFlags, + other_flags = OtherFlags, f_send = FSend}, Challenge ) -> - ?trace("send: challenge=~w version=~w\n", - [Challenge,Version]), - ?to_port(FSend, Socket, [$n,?int16(Version), ?int32(Flags), - ?int32(Challenge), - atom_to_list(Node)]). + case OtherFlags band ?DFLAG_HANDSHAKE_23 of + 0 -> + %% Reply with old 'n' message + ?trace("send: 'n' challenge=~w\n", [Challenge]), + + ?to_port(FSend, Socket, [<<$n, + ?ERL_DIST_VER_5:16, % echo same Version back + ThisFlags:32, + Challenge:32>>, + atom_to_list(Node)]); + + ?DFLAG_HANDSHAKE_23 -> + %% Reply with new 'N' message + Creation = erts_internal:get_creation(), + NodeName = atom_to_binary(Node, latin1), + NameLen = byte_size(NodeName), + ?trace("send: 'N' challenge=~w creation=~w\n", + [Challenge,Creation]), + ?to_port(FSend, Socket, [<<$N, + ThisFlags:64, + Challenge:32, + Creation:32, + NameLen:16>>, NodeName]) + end. + +send_complement(#hs_data{socket = Socket, + f_send = FSend, + this_flags = Flags, + other_flags = Flags}, + SendNameVersion) -> + if SendNameVersion =:= ?ERL_DIST_VER_5, + (Flags band ?DFLAG_HANDSHAKE_23) =/= 0 -> + %% We sent an old 'n' name message and need to complement + %% with creation value. + Creation = erts_internal:get_creation(), + FlagsHigh = Flags bsr 32, + ?trace("send_complement: 'c' flags_high=~w creation=~w\n", [FlagsHigh,Creation]), + ?to_port(FSend, Socket, [<<$c, FlagsHigh:32, Creation:32>>]); + true-> + ok % no complement msg needed + end. send_challenge_reply(#hs_data{socket = Socket, f_send = FSend}, Challenge, Digest) -> @@ -632,31 +709,50 @@ send_challenge_ack(#hs_data{socket = Socket, f_send = FSend}, %% -%% Get the name of the other side. +%% Receive first handshake message sent from connecting side. %% Close the connection if invalid data. -%% The IP address sent is not interesting (as in the old -%% tcp_drv.c which used it to detect simultaneous connection -%% attempts). %% recv_name(#hs_data{socket = Socket, f_recv = Recv} = HSData) -> case Recv(Socket, 0, infinity) of - {ok, - [$n,VersionA, VersionB, Flag1, Flag2, Flag3, Flag4 - | OtherNode] = Data} -> - case is_node_name(OtherNode) of - true -> - Flags = ?u32(Flag1, Flag2, Flag3, Flag4), - Version = ?u16(VersionA,VersionB), - is_allowed(HSData, Flags, OtherNode, Version); - false -> - ?shutdown(Data) - end; + {ok, [$n | _] = Data} -> + recv_name_old(HSData, Data); + {ok, [$N | _] = Data} -> + recv_name_new(HSData, Data); _ -> ?shutdown(no_node) end. -is_node_name(OtherNodeName) -> - case string:split(OtherNodeName, "@", all) of +recv_name_old(HSData, + [$n, V1, V0, F3, F2, F1, F0 | Node] = Data) -> + <<_Version:16>> = <<V1,V0>>, + <<Flags:32>> = <<F3,F2,F1,F0>>, + ?trace("recv_name: 'n' node=~p version=~w\n", [Node, _Version]), + case is_node_name(Node) of + true -> + check_allowed(HSData, Node), + {Flags, list_to_atom(Node), ?CREATION_UNKNOWN, ?ERL_DIST_VER_5}; + false -> + ?shutdown(Data) + end. + +recv_name_new(HSData, + [$N, F7,F6,F5,F4,F3,F2,F1,F0, Cr3,Cr2,Cr1,Cr0, + NL1, NL0 | Rest] = Data) -> + <<Flags:64>> = <<F7,F6,F5,F4,F3,F2,F1,F0>>, + <<Creation:32>> = <<Cr3,Cr2,Cr1,Cr0>>, + <<NameLen:16>> = <<NL1,NL0>>, + {Node, _Residue} = lists:split(NameLen, Rest), + ?trace("recv_name: 'N' node=~p creation=~w\n", [Node, Creation]), + case is_node_name(Node) of + true -> + check_allowed(HSData, Node), + {Flags, list_to_atom(Node), Creation, ?ERL_DIST_VER_6}; + false -> + ?shutdown(Data) + end. + +is_node_name(NodeName) -> + case string:split(NodeName, "@", all) of [Name,Host] -> (not string:is_empty(Name)) andalso (not string:is_empty(Host)); @@ -692,12 +788,12 @@ split_node(Node) -> %% with allow-node-scheme. An empty allowed list %% allows all nodes. %% -is_allowed(#hs_data{allowed = []}, Flags, Node, Version) -> - {Flags,list_to_atom(Node),Version}; -is_allowed(#hs_data{allowed = Allowed} = HSData, Flags, Node, Version) -> +check_allowed(#hs_data{allowed = []}, _Node) -> + ok; +check_allowed(#hs_data{allowed = Allowed} = HSData, Node) -> case is_allowed(Node, Allowed) of true -> - {Flags,list_to_atom(Node),Version}; + ok; false -> send_status(HSData#hs_data{other_node = Node}, not_allowed), error_msg("** Connection attempt from " @@ -771,25 +867,91 @@ publish_type(Flags) -> end. %% wait for challenge after connect -recv_challenge(#hs_data{socket=Socket,other_node=Node, - other_version=Version,f_recv=Recv}) -> +recv_challenge(#hs_data{socket=Socket, f_recv=Recv}=HSData) -> case Recv(Socket, 0, infinity) of - {ok,[$n,V1,V0,Fl1,Fl2,Fl3,Fl4,CA3,CA2,CA1,CA0 | Ns]} -> - Flags = ?u32(Fl1,Fl2,Fl3,Fl4), - try {list_to_existing_atom(Ns),?u16(V1,V0)} of - {Node,Version} -> - Challenge = ?u32(CA3,CA2,CA1,CA0), - ?trace("recv: node=~w, challenge=~w version=~w\n", - [Node, Challenge,Version]), - {Flags,Challenge}; - _ -> - ?shutdown2(no_node, {recv_challenge_failed, no_node, Ns}) - catch - error:badarg -> - ?shutdown2(no_node, {recv_challenge_failed, no_node, Ns}) - end; + {ok, [$n | _]=Msg} -> + recv_challenge_old(HSData, Msg); + {ok,[$N | _]=Msg} -> + recv_challenge_new(HSData, Msg); Other -> - ?shutdown2(no_node, {recv_challenge_failed, Other}) + ?shutdown2(no_node, {recv_challenge_failed, Other}) + end. + +recv_challenge_old(#hs_data{other_node=Node}, + [$n, V1,V0, F3,F2,F1,F0, C3,C2,C1,C0 | Ns]=Msg) -> + <<_Version:16>> = <<V1,V0>>, + <<Flags:32>> = <<F3,F2,F1,F0>>, + <<Challenge:32>> = <<C3,C2,C1,C0>>, + ?trace("recv: 'n' node=~p, challenge=~w version=~w\n", + [Ns, Challenge, _Version]), + try {list_to_existing_atom(Ns), Flags band ?DFLAG_HANDSHAKE_23} of + {Node, 0} -> + {Flags, Challenge, ?CREATION_UNKNOWN}; + _ -> + ?shutdown2(no_node, {recv_challenge_failed, version, Msg}) + catch + error:badarg -> + ?shutdown2(no_node, {recv_challenge_failed, no_node, Ns}) + end; +recv_challenge_old(_, Other) -> + ?shutdown2(no_node, {recv_challenge_failed, Other}). + +recv_challenge_new(#hs_data{other_node=Node}, + [$N, + F7,F6,F5,F4,F3,F2,F1,F0, + Ch3,Ch2,Ch1,Ch0, + Cr3,Cr2,Cr1,Cr0, + NL1,NL0 | Rest] = Msg) -> + <<Flags:64>> = <<F7,F6,F5,F4,F3,F2,F1,F0>>, + <<Challenge:32>> = <<Ch3,Ch2,Ch1,Ch0>>, + <<Creation:32>> = <<Cr3,Cr2,Cr1,Cr0>>, + <<NameLen:16>> = <<NL1,NL0>>, + {Ns, _Residue} = + try + lists:split(NameLen, Rest) + catch + error:badarg -> + ?shutdown2(no_node, {recv_challenge_failed, no_node, Msg}) + end, + ?trace("recv: 'N' node=~p, challenge=~w creation=~w\n", + [Ns, Challenge, Creation]), + + case Flags band ?DFLAG_HANDSHAKE_23 of + ?DFLAG_HANDSHAKE_23 -> + try list_to_existing_atom(Ns) of + Node -> + {Flags, Challenge, Creation}; + _ -> + ?shutdown2(no_node, {recv_challenge_failed, no_node, Ns}) + catch + error:badarg -> + ?shutdown2(no_node, {recv_challenge_failed, no_node, Ns}) + end; + 0 -> + ?shutdown2(no_node, {recv_challenge_failed, version, Msg}) + end; +recv_challenge_new(_, Other) -> + ?shutdown2(no_node, {recv_challenge_failed, Other}). + + +recv_complement(#hs_data{socket = Socket, + f_recv = Recv, + other_flags = Flags} = HSData, + SendNameVersion) -> + if SendNameVersion =:= ?ERL_DIST_VER_5, + (Flags band ?DFLAG_HANDSHAKE_23) =/= 0 -> + case Recv(Socket, 0, infinity) of + {ok, [$c, F7,F6,F5,F4, Cr3,Cr2,Cr1,Cr0]} -> + <<FlagsHigh:32>> = <<F7,F6,F5,F4>>, + <<Creation:32>> = <<Cr3,Cr2,Cr1,Cr0>>, + ?trace("recv_complement: creation=~w\n", [Creation]), + HSData#hs_data{other_creation = Creation, + other_flags = Flags bor (FlagsHigh bsl 32)}; + Other -> + ?shutdown2(no_node, {recv_complement_failed, Other}) + end; + true -> + HSData end. diff --git a/lib/kernel/src/erl_epmd.erl b/lib/kernel/src/erl_epmd.erl index 3357f67b01..fecb1cd3e0 100644 --- a/lib/kernel/src/erl_epmd.erl +++ b/lib/kernel/src/erl_epmd.erl @@ -29,14 +29,16 @@ -define(port_please_failure2(Term), noop). -endif. +-include("dist.hrl"). + -ifndef(erlang_daemon_port). -define(erlang_daemon_port, 4369). -endif. -ifndef(epmd_dist_high). --define(epmd_dist_high, 6). +-define(epmd_dist_high, ?ERL_DIST_VER_HIGH). -endif. -ifndef(epmd_dist_low). --define(epmd_dist_low, 5). +-define(epmd_dist_low, ?ERL_DIST_VER_LOW). -endif. %% External exports diff --git a/lib/kernel/test/erl_distribution_wb_SUITE.erl b/lib/kernel/test/erl_distribution_wb_SUITE.erl index bb42a0ac39..ca4511a19b 100644 --- a/lib/kernel/test/erl_distribution_wb_SUITE.erl +++ b/lib/kernel/test/erl_distribution_wb_SUITE.erl @@ -28,15 +28,6 @@ -export([init_per_testcase/2, end_per_testcase/2, whitebox/1, switch_options/1, missing_compulsory_dflags/1]). -%% 1) -%% -%% Connections are now always set up symmetrically with respect to -%% publication. If connecting node doesn't send DFLAG_PUBLISHED -%% the other node wont send DFLAG_PUBLISHED. If the connecting -%% node send DFLAG_PUBLISHED but the other node doesn't send -%% DFLAG_PUBLISHED, the connecting node should consider its -%% DFLAG_PUBLISHED as dropped, i.e the connecting node wont be -%% published on the other node. -define(to_port(Socket, Data), case inet_tcp:send(Socket, Data) of @@ -47,8 +38,8 @@ R end). --define(EPMD_DIST_HIGH, 6). --define(EPMD_DIST_LOW, 5). +-define(DIST_VER_HIGH, 6). +-define(DIST_VER_LOW, 5). -define(DFLAG_PUBLISHED,1). -define(DFLAG_ATOM_CACHE,2). @@ -61,6 +52,7 @@ -define(DFLAG_EXTENDED_PIDS_PORTS,16#100). -define(DFLAG_UTF8_ATOMS, 16#10000). -define(DFLAG_BIG_CREATION, 16#40000). +-define(DFLAG_HANDSHAKE_23, 16#01000000). %% From R9 and forward extended references is compulsory %% From R10 and forward extended pids and ports are compulsory @@ -137,9 +129,18 @@ whitebox(Config) when is_list(Config) -> {ok, Node} = start_node(?MODULE,""), Cookie = erlang:get_cookie(), {_,Host} = split(node()), - ok = pending_up_md5(Node, join(ccc,Host), Cookie), - ok = simultaneous_md5(Node, join('A',Host), Cookie), - ok = simultaneous_md5(Node, join(zzzzzzzzzzzzzz,Host), Cookie), + [begin + io:format("Test OurVersion=~p and TrustEpmd=~p\n", + [OurVersion, TrustEpmd]), + ok = pending_up_md5(Node, join(ccc,Host), OurVersion, + TrustEpmd, Cookie), + ok = simultaneous_md5(Node, join('A',Host), OurVersion, + TrustEpmd, Cookie), + ok = simultaneous_md5(Node, join(zzzzzzzzzzzzzz,Host), + OurVersion, TrustEpmd, Cookie) + end + || OurVersion <- lists:seq(?DIST_VER_LOW, ?DIST_VER_HIGH), + TrustEpmd <- [true, false]], stop_node(Node), ok. @@ -208,17 +209,22 @@ test_switch_active_and_packet() -> %% %% Handshake tests %% -pending_up_md5(Node,OurName,Cookie) -> +pending_up_md5(Node,OurName,OurVersion,TrustEpmd,Cookie) -> {NA,NB} = split(Node), - {port,PortNo,_} = erl_epmd:port_please(NA,NB), + {port,PortNo,EpmdSaysVersion} = erl_epmd:port_please(NA,NB), {ok, SocketA} = gen_tcp:connect(atom_to_list(NB),PortNo, [{active,false}, {packet,2}]), - send_name(SocketA,OurName, ?EPMD_DIST_HIGH), + AssumedVersion = case TrustEpmd of + true -> EpmdSaysVersion; + false -> ?DIST_VER_LOW + end, + SentNameMsg = send_name(SocketA,OurName, OurVersion, AssumedVersion), ok = recv_status(SocketA), - {hidden,Node,?EPMD_DIST_HIGH,HisChallengeA} = recv_challenge(SocketA), % See 1) + {Node,ChallengeMsg,HisChallengeA} = recv_challenge(SocketA,OurVersion), OurChallengeA = gen_challenge(), OurDigestA = gen_digest(HisChallengeA, Cookie), + send_complement(SocketA, SentNameMsg, ChallengeMsg, OurVersion), send_challenge_reply(SocketA, OurChallengeA, OurDigestA), ok = recv_challenge_ack(SocketA, OurChallengeA, Cookie), %%% @@ -230,13 +236,14 @@ pending_up_md5(Node,OurName,Cookie) -> {ok, SocketB} = gen_tcp:connect(atom_to_list(NB),PortNo, [{active,false}, {packet,2}]), - send_name(SocketB,OurName, ?EPMD_DIST_HIGH), + SentNameMsg = send_name(SocketB,OurName, OurVersion, AssumedVersion), alive = recv_status(SocketB), send_status(SocketB, true), gen_tcp:close(SocketA), - {hidden,Node,?EPMD_DIST_HIGH,HisChallengeB} = recv_challenge(SocketB), % See 1) + {Node,ChallengeMsg,HisChallengeB} = recv_challenge(SocketB,OurVersion), OurChallengeB = gen_challenge(), OurDigestB = gen_digest(HisChallengeB, Cookie), + send_complement(SocketB, SentNameMsg, ChallengeMsg, OurVersion), send_challenge_reply(SocketB, OurChallengeB, OurDigestB), ok = recv_challenge_ack(SocketB, OurChallengeB, Cookie), %%% @@ -252,7 +259,7 @@ pending_up_md5(Node,OurName,Cookie) -> gen_tcp:close(SocketB), ok. -simultaneous_md5(Node, OurName, Cookie) when OurName < Node -> +simultaneous_md5(Node, OurName, OurVersion, TrustEpmd, Cookie) when OurName < Node -> pong = net_adm:ping(Node), LSocket = case gen_tcp:listen(0, [{active, false}, {packet,2}]) of {ok, Socket} -> @@ -260,15 +267,19 @@ simultaneous_md5(Node, OurName, Cookie) when OurName < Node -> Else -> exit(Else) end, - EpmdSocket = register_node(OurName, LSocket, ?EPMD_DIST_LOW, ?EPMD_DIST_HIGH), + EpmdSocket = register_node(OurName, LSocket, ?DIST_VER_LOW, ?DIST_VER_LOW), {NA, NB} = split(Node), rpc:cast(Node, net_adm, ping, [OurName]), receive after 1000 -> ok end, - {port, PortNo, _} = erl_epmd:port_please(NA,NB), + {port, PortNo, EpmdSaysVersion} = erl_epmd:port_please(NA,NB), {ok, SocketA} = gen_tcp:connect(atom_to_list(NB),PortNo, [{active,false}, {packet,2}]), - send_name(SocketA,OurName, ?EPMD_DIST_HIGH), + AssumedVersion = case TrustEpmd of + true -> EpmdSaysVersion; + false -> ?DIST_VER_LOW + end, + send_name(SocketA,OurName, OurVersion, AssumedVersion), %% We are still not marked up on the other side, as our first message %% is not sent. SocketB = case gen_tcp:accept(LSocket) of @@ -281,10 +292,12 @@ simultaneous_md5(Node, OurName, Cookie) when OurName < Node -> %% Now we are expected to close A gen_tcp:close(SocketA), %% But still Socket B will continue - {normal,Node,?EPMD_DIST_HIGH} = recv_name(SocketB), % See 1) + {Node,GotNameMsg,GotFlags} = recv_name(SocketB), + true = (GotFlags band ?DFLAG_HANDSHAKE_23) =/= 0, send_status(SocketB, ok_simultaneous), MyChallengeB = gen_challenge(), - send_challenge(SocketB, OurName, MyChallengeB, ?EPMD_DIST_HIGH), + send_challenge(SocketB, OurName, MyChallengeB, OurVersion, GotFlags), + recv_complement(SocketB, GotNameMsg, OurVersion), {ok,HisChallengeB} = recv_challenge_reply(SocketB, MyChallengeB, Cookie), DigestB = gen_digest(HisChallengeB,Cookie), send_challenge_ack(SocketB, DigestB), @@ -299,7 +312,7 @@ simultaneous_md5(Node, OurName, Cookie) when OurName < Node -> gen_tcp:close(EpmdSocket), ok; -simultaneous_md5(Node, OurName, Cookie) when OurName > Node -> +simultaneous_md5(Node, OurName, OurVersion, TrustEpmd, Cookie) when OurName > Node -> pong = net_adm:ping(Node), LSocket = case gen_tcp:listen(0, [{active, false}, {packet,2}]) of {ok, Socket} -> @@ -308,11 +321,11 @@ simultaneous_md5(Node, OurName, Cookie) when OurName > Node -> exit(Else) end, EpmdSocket = register_node(OurName, LSocket, - ?EPMD_DIST_LOW, ?EPMD_DIST_HIGH), + ?DIST_VER_LOW, ?DIST_VER_LOW), {NA, NB} = split(Node), rpc:cast(Node, net_adm, ping, [OurName]), receive after 1000 -> ok end, - {port, PortNo, _} = erl_epmd:port_please(NA,NB), + {port, PortNo, EpmdSaysVersion} = erl_epmd:port_please(NA,NB), {ok, SocketA} = gen_tcp:connect(atom_to_list(NB),PortNo, [{active,false}, {packet,2}]), @@ -322,15 +335,21 @@ simultaneous_md5(Node, OurName, Cookie) when OurName > Node -> Else2 -> exit(Else2) end, - send_name(SocketA,OurName, ?EPMD_DIST_HIGH), + AssumedVersion = case TrustEpmd of + true -> EpmdSaysVersion; + false -> ?DIST_VER_LOW + end, + SentNameMsg = send_name(SocketA,OurName, OurVersion, AssumedVersion), ok_simultaneous = recv_status(SocketA), %% Socket B should die during this case catch begin - {normal,Node,?EPMD_DIST_HIGH} = recv_name(SocketB), % See 1) + {Node,GotNameMsg,GotFlagsB} = recv_name(SocketB), + true = (GotFlagsB band ?DFLAG_HANDSHAKE_23) =/= 0, send_status(SocketB, ok_simultaneous), MyChallengeB = gen_challenge(), send_challenge(SocketB, OurName, MyChallengeB, - 5), + OurVersion, GotFlagsB), + recv_complement(SocketB, GotNameMsg, OurVersion), {ok,HisChallengeB} = recv_challenge_reply( SocketB, MyChallengeB, @@ -353,9 +372,10 @@ simultaneous_md5(Node, OurName, Cookie) when OurName > Node -> end, gen_tcp:close(SocketB), %% But still Socket A will continue - {hidden,Node,?EPMD_DIST_HIGH,HisChallengeA} = recv_challenge(SocketA), % See 1) + {Node,ChallengeMsg,HisChallengeA} = recv_challenge(SocketA,OurVersion), OurChallengeA = gen_challenge(), OurDigestA = gen_digest(HisChallengeA, Cookie), + send_complement(SocketA, SentNameMsg, ChallengeMsg, OurVersion), send_challenge_reply(SocketA, OurChallengeA, OurDigestA), ok = recv_challenge_ack(SocketA, OurChallengeA, Cookie), @@ -375,13 +395,16 @@ missing_compulsory_dflags(Config) when is_list(Config) -> {ok, Node} = start_node(Name1,""), {NA,NB} = split(Node), {port,PortNo,_} = erl_epmd:port_please(NA,NB), - {ok, SocketA} = gen_tcp:connect(atom_to_list(NB),PortNo, - [{active,false}, - {packet,2}]), - BadNode = list_to_atom(atom_to_list(Name2)++"@"++atom_to_list(NB)), - send_name(SocketA,BadNode, ?EPMD_DIST_HIGH, 0), - not_allowed = recv_status(SocketA), - gen_tcp:close(SocketA), + [begin + {ok, SocketA} = gen_tcp:connect(atom_to_list(NB),PortNo, + [{active,false}, + {packet,2}]), + BadNode = list_to_atom(atom_to_list(Name2)++"@"++atom_to_list(NB)), + send_name(SocketA,BadNode, Version, Version, 0), + not_allowed = recv_status(SocketA), + gen_tcp:close(SocketA) + end + || Version <- lists:seq(?DIST_VER_LOW, ?DIST_VER_HIGH)], stop_node(Node), ok. @@ -493,31 +516,76 @@ recv_status(Socket) -> exit(Bad) end. -send_challenge(Socket, Node, Challenge, Version) -> - send_challenge(Socket, Node, Challenge, Version, ?COMPULSORY_DFLAGS). -send_challenge(Socket, Node, Challenge, Version, Flags) -> - {ok, {{_Ip1,_Ip2,_Ip3,_Ip4}, _}} = inet:sockname(Socket), - ?to_port(Socket, [$n,?int16(Version),?int32(Flags), - ?int32(Challenge), atom_to_list(Node)]). +send_challenge(Socket, Node, Challenge, Version, GotFlags) -> + send_challenge(Socket, Node, Challenge, Version, GotFlags, ?COMPULSORY_DFLAGS). -recv_challenge(Socket) -> - case gen_tcp:recv(Socket, 0) of - {ok,[$n,V1,V0,Fl1,Fl2,Fl3,Fl4,CA3,CA2,CA1,CA0 | Ns]} -> +send_challenge(Socket, Node, Challenge, ?DIST_VER_LOW, _GotFlags, Flags) -> + {ok, {{_Ip1,_Ip2,_Ip3,_Ip4}, _}} = inet:sockname(Socket), + ?to_port(Socket, [$n,<<?DIST_VER_LOW:16>>,<<Flags:32>>, + <<Challenge:32>>, atom_to_list(Node)]); +send_challenge(Socket, Node, Challenge, ?DIST_VER_HIGH, GotFlags, Flags) -> + true = (GotFlags band ?DFLAG_HANDSHAKE_23) =/= 0, + {ok, {{_Ip1,_Ip2,_Ip3,_Ip4}, _}} = inet:sockname(Socket), + NodeName = atom_to_list(Node), + Nlen = length(NodeName), + Creation = erts_internal:get_creation(), + ?to_port(Socket, [$N, <<(Flags bor ?DFLAG_HANDSHAKE_23):64>>, + <<Challenge:32>>, <<Creation:32>>, + <<Nlen:16>>, NodeName + ]). + +recv_challenge(Socket, OurVersion) -> + {ok, Msg} = gen_tcp:recv(Socket, 0), + %%io:format("recv_challenge Msg=~p\n", [Msg]), + case {OurVersion, Msg} of + {?DIST_VER_LOW, + [$n,V1,V0,Fl1,Fl2,Fl3,Fl4,CA3,CA2,CA1,CA0 | Ns]} -> Flags = ?u32(Fl1,Fl2,Fl3,Fl4), - Type = case Flags band ?DFLAG_PUBLISHED of - 0 -> - hidden; - _ -> - normal - end, + true = (Flags band ?COMPULSORY_DFLAGS) =:= ?COMPULSORY_DFLAGS, Node =list_to_atom(Ns), - Version = ?u16(V1,V0), + ?DIST_VER_LOW = ?u16(V1,V0), + Challenge = ?u32(CA3,CA2,CA1,CA0), + {Node,$n,Challenge}; + + {?DIST_VER_HIGH, + [$N, F7,F6,F5,F4,F3,F2,F1,F0, CA3,CA2,CA1,CA0, + Cr3,Cr2,Cr1,Cr0, NL1,NL0 | Ns]} -> + <<Flags:64>> = <<F7,F6,F5,F4,F3,F2,F1,F0>>, + true = (Flags band ?COMPULSORY_DFLAGS) =:= ?COMPULSORY_DFLAGS, + <<Creation:32>> = <<Cr3,Cr2,Cr1,Cr0>>, + true = (Creation =/= 0), + <<NameLen:16>> = <<NL1,NL0>>, + NameLen = length(Ns), + Node = list_to_atom(Ns), Challenge = ?u32(CA3,CA2,CA1,CA0), - {Type,Node,Version,Challenge}; + {Node,$N,Challenge}; + _ -> ?shutdown(no_node) end. +send_complement(Socket, SentNameMsg, ChallengeMsg, OurVersion) -> + case {SentNameMsg,ChallengeMsg} of + {$n,$N} -> + FlagsHigh = our_flags(?COMPULSORY_DFLAGS, OurVersion) bsr 32, + ?to_port(Socket, [$c, + <<FlagsHigh:32>>, + ?int32(erts_internal:get_creation())]); + {Same,Same} -> + ok + end. + +recv_complement(Socket, $n, OurVersion) when OurVersion > ?DIST_VER_LOW -> + case gen_tcp:recv(Socket, 0) of + {ok,[$c,Cr3,Cr2,Cr1,Cr0]} -> + Creation = ?u32(Cr3,Cr2,Cr1,Cr0), + true = (Creation =/= 0); + Err -> + {error,Err} + end; +recv_complement(_, _ , _) -> + ok. + send_challenge_reply(Socket, Challenge, Digest) -> ?to_port(Socket, [$r,?int32(Challenge),Digest]). @@ -546,20 +614,34 @@ recv_challenge_ack(Socket, ChallengeB, CookieA) -> ok; true -> ?shutdown(bad_challenge_ack) - end; - _ -> - ?shutdown(bad_challenge_ack) + end end. -send_name(Socket, MyNode0, Version) -> - send_name(Socket, MyNode0, Version, ?COMPULSORY_DFLAGS). -send_name(Socket, MyNode0, Version, Flags) -> +send_name(Socket, MyNode0, OurVersion, AssumedVersion) -> + send_name(Socket, MyNode0, OurVersion, AssumedVersion, ?COMPULSORY_DFLAGS). + +send_name(Socket, MyNode0, OurVersion, AssumedVersion, Flags) -> MyNode = atom_to_list(MyNode0), - ok = ?to_port(Socket, [<<$n,Version:16,Flags:32>>|MyNode]). + if (OurVersion =:= ?DIST_VER_LOW) or + (AssumedVersion =:= ?DIST_VER_LOW) -> + OurFlags = our_flags(Flags,OurVersion), + ok = ?to_port(Socket, [<<$n,OurVersion:16,OurFlags:32>>|MyNode]), + $n; + + (OurVersion > ?DIST_VER_LOW) and + (AssumedVersion > ?DIST_VER_LOW) -> + Creation = erts_internal:get_creation(), + NameLen = length(MyNode), + ok = ?to_port(Socket, [<<$N, (Flags bor ?DFLAG_HANDSHAKE_23):64, + Creation:32,NameLen:16>>|MyNode]), + $N + end. + +our_flags(Flags, ?DIST_VER_LOW) -> + Flags; +our_flags(Flags, OurVersion) when OurVersion > ?DIST_VER_LOW -> + Flags bor ?DFLAG_HANDSHAKE_23. -%% -%% recv_name is common for both old and new handshake. -%% recv_name(Socket) -> case gen_tcp:recv(Socket, 0) of {ok,Data} -> @@ -568,19 +650,18 @@ recv_name(Socket) -> ?shutdown({no_node,Res}) end. -get_name([$m,VersionA,VersionB,_Ip1,_Ip2,_Ip3,_Ip4|OtherNode]) -> - {normal, list_to_atom(OtherNode), ?u16(VersionA,VersionB)}; -get_name([$h,VersionA,VersionB,_Ip1,_Ip2,_Ip3,_Ip4|OtherNode]) -> - {hidden, list_to_atom(OtherNode), ?u16(VersionA,VersionB)}; -get_name([$n,VersionA, VersionB, Flag1, Flag2, Flag3, Flag4 | OtherNode]) -> - Type = case ?u32(Flag1, Flag2, Flag3, Flag4) band ?DFLAG_PUBLISHED of - 0 -> - hidden; - _ -> - normal - end, - {Type, list_to_atom(OtherNode), - ?u16(VersionA,VersionB)}; +get_name([$n, V1,V0, F3,F2,F1,F0 | OtherNode]) -> + <<Version:16>> = <<V1,V0>>, + 5 = Version, + <<Flags:32>> = <<F3,F2,F1,F0>>, + {list_to_atom(OtherNode), $n, Flags}; +get_name([$N, F7,F6,F5,F4,F3,F2,F1,F0, + _C3,_C2,_C1,_C0, NLen1,NLen2 | OtherNode]) -> + <<Flags:64>> = <<F7,F6,F5,F4,F3,F2,F1,F0>>, + true = (Flags band ?DFLAG_HANDSHAKE_23) =/= 0, + <<NameLen:16>> = <<NLen1,NLen2>>, + NameLen = length(OtherNode), + {list_to_atom(OtherNode), $N, Flags}; get_name(Data) -> ?shutdown(Data). |