diff options
author | Rickard Green <rickard@erlang.org> | 2021-02-25 20:23:33 +0100 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2021-03-10 17:19:03 +0100 |
commit | 35f12b5451d9281247e713260c0bb7543bf09a90 (patch) | |
tree | aa4791d34fccff0d36154ff7a1c362e9189e150b /lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java | |
parent | f1f0bf6e2b7ae02fe57d4f42e4df5a1ecffcc147 (diff) | |
download | erlang-35f12b5451d9281247e713260c0bb7543bf09a90.tar.gz |
jinterface: Fix link protocol inconsistency bug
Diffstat (limited to 'lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java')
-rw-r--r-- | lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java | 105 |
1 files changed, 91 insertions, 14 deletions
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java index 26f6ffcd97..fb7c6869b7 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java @@ -76,6 +76,8 @@ public abstract class AbstractConnection extends Thread { protected static final int exitTTTag = 13; protected static final int regSendTTTag = 16; protected static final int exit2TTTag = 18; + protected static final int unlinkIdTag = 35; + protected static final int unlinkIdAckTag = 36; // MD5 challenge messsage tags protected static final int ChallengeReply = 'r'; @@ -355,10 +357,8 @@ public abstract class AbstractConnection extends Thread { // link to pid /** - * Create a link between the local node and the specified process on the - * remote node. If the link is still active when the remote process - * terminates, an exit signal will be sent to this connection. Use - * {@link #sendUnlink unlink()} to remove the link. + * + * Send link signal to remote process. * * @param dest * the Erlang PID of the remote process. @@ -393,9 +393,8 @@ public abstract class AbstractConnection extends Thread { } /** - * Remove a link between the local node and the specified process on the - * remote node. This method deactivates links created with {@link #sendLink - * link()}. + * + * Send unlink signal to remote process. * * @param dest * the Erlang PID of the remote process. @@ -404,7 +403,8 @@ public abstract class AbstractConnection extends Thread { * if the connection is not active or a communication error * occurs. */ - protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest) + protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest, + long unlink_id) throws IOException { if (!connected) { throw new IOException("Not connected"); @@ -417,11 +417,29 @@ public abstract class AbstractConnection extends Thread { header.write1(passThrough); header.write1(version); - // header - header.write_tuple_head(3); - header.write_long(unlinkTag); - header.write_any(from); - header.write_any(dest); + if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) { + // header + header.write_tuple_head(4); + header.write_long(unlinkIdTag); + header.write_long(unlink_id); + header.write_any(from); + header.write_any(dest); + } + else { + /* + * A node that isn't capable of talking the new link protocol. + * + * Send an old unlink op, and send ourselves an unlink-ack. We may + * end up in an inconsistent state as we could before the new link + * protocol was introduced... + */ + // header + header.write_tuple_head(3); + header.write_long(unlinkTag); + header.write_any(from); + header.write_any(dest); + deliver(new OtpMsg(unlinkIdAckTag, dest, from, unlink_id)); + } // fix up length in preamble header.poke4BE(0, header.size() - 4); @@ -429,6 +447,45 @@ public abstract class AbstractConnection extends Thread { do_send(header); } + /** + * Send unlink acknowledgment signal to remote process. + * + * @param dest + * the Erlang PID of the remote process. + * + * @exception java.io.IOException + * if the connection is not active or a communication error + * occurs. + */ + protected void sendUnlinkAck(final OtpErlangPid from, final OtpErlangPid dest, + long unlink_id) + throws IOException { + if (!connected) { + throw new IOException("Not connected"); + } + if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) { + @SuppressWarnings("resource") + final OtpOutputStream header = new OtpOutputStream(headerLen); + + // preamble: 4 byte length + "passthrough" tag + header.write4BE(0); // reserve space for length + header.write1(passThrough); + header.write1(version); + + // header + header.write_tuple_head(4); + header.write_long(unlinkIdAckTag); + header.write_long(unlink_id); + header.write_any(from); + header.write_any(dest); + // fix up length in preamble + header.poke4BE(0, header.size() - 4); + + do_send(header); + } + + } + /* used internally when "processes" terminate */ protected void sendExit(final OtpErlangPid from, final OtpErlangPid dest, final OtpErlangObject reason) throws IOException { @@ -685,7 +742,21 @@ public abstract class AbstractConnection extends Thread { from = (OtpErlangPid) head.elementAt(1); to = (OtpErlangPid) head.elementAt(2); - deliver(new OtpMsg(tag, from, to)); + deliver(new OtpMsg(tag, from, to, 0)); + break; + + case unlinkIdTag: // { UNLINK_ID, UnlinkId, FromPid, ToPid} + case unlinkIdAckTag: // { UNLINK_ID_Ack, UnlinkId, FromPid, ToPid} + if (traceLevel >= ctrlThreshold) { + System.out.println("<- " + headerType(head) + " " + + head); + } + + long unlink_id = ((OtpErlangLong) head.elementAt(1)).longValue(); + from = (OtpErlangPid) head.elementAt(2); + to = (OtpErlangPid) head.elementAt(3); + + deliver(new OtpMsg(tag, from, to, unlink_id)); break; // absolutely no idea what to do with these, so we ignore @@ -877,6 +948,12 @@ public abstract class AbstractConnection extends Thread { case unlinkTag: return "UNLINK"; + case unlinkIdTag: + return "UNLINK_ID"; + + case unlinkIdAckTag: + return "UNLINK_ID_ACK"; + case regSendTag: return "REG_SEND"; |