summaryrefslogtreecommitdiff
path: root/lib/jinterface/java_src/com
diff options
context:
space:
mode:
Diffstat (limited to 'lib/jinterface/java_src/com')
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java302
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java13
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/Link.java11
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/Links.java132
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/Makefile7
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java76
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java45
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java12
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPid.java6
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPort.java6
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangRef.java7
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java14
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMbox.java128
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMsg.java52
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java50
15 files changed, 625 insertions, 236 deletions
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
index 0bf3ca2a67..fb7c6869b7 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
@@ -76,6 +76,8 @@ public abstract class AbstractConnection extends Thread {
protected static final int exitTTTag = 13;
protected static final int regSendTTTag = 16;
protected static final int exit2TTTag = 18;
+ protected static final int unlinkIdTag = 35;
+ protected static final int unlinkIdAckTag = 36;
// MD5 challenge messsage tags
protected static final int ChallengeReply = 'r';
@@ -147,21 +149,6 @@ public abstract class AbstractConnection extends Thread {
if (traceLevel >= handshakeThreshold) {
System.out.println("<- ACCEPT FROM " + s);
}
-
- // get his info
- recvName(peer);
-
- // now find highest common dist value
- if (peer.proto != self.proto || self.distHigh < peer.distLow
- || self.distLow > peer.distHigh) {
- close();
- throw new IOException(
- "No common protocol found - cannot accept connection");
- }
- // highest common version: min(peer.distHigh, self.distHigh)
- peer.distChoose = peer.distHigh > self.distHigh ? self.distHigh
- : peer.distHigh;
-
doAccept();
name = peer.node();
}
@@ -370,10 +357,8 @@ public abstract class AbstractConnection extends Thread {
// link to pid
/**
- * Create a link between the local node and the specified process on the
- * remote node. If the link is still active when the remote process
- * terminates, an exit signal will be sent to this connection. Use
- * {@link #sendUnlink unlink()} to remove the link.
+ *
+ * Send link signal to remote process.
*
* @param dest
* the Erlang PID of the remote process.
@@ -408,9 +393,8 @@ public abstract class AbstractConnection extends Thread {
}
/**
- * Remove a link between the local node and the specified process on the
- * remote node. This method deactivates links created with {@link #sendLink
- * link()}.
+ *
+ * Send unlink signal to remote process.
*
* @param dest
* the Erlang PID of the remote process.
@@ -419,7 +403,8 @@ public abstract class AbstractConnection extends Thread {
* if the connection is not active or a communication error
* occurs.
*/
- protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest)
+ protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest,
+ long unlink_id)
throws IOException {
if (!connected) {
throw new IOException("Not connected");
@@ -432,11 +417,29 @@ public abstract class AbstractConnection extends Thread {
header.write1(passThrough);
header.write1(version);
- // header
- header.write_tuple_head(3);
- header.write_long(unlinkTag);
- header.write_any(from);
- header.write_any(dest);
+ if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) {
+ // header
+ header.write_tuple_head(4);
+ header.write_long(unlinkIdTag);
+ header.write_long(unlink_id);
+ header.write_any(from);
+ header.write_any(dest);
+ }
+ else {
+ /*
+ * A node that isn't capable of talking the new link protocol.
+ *
+ * Send an old unlink op, and send ourselves an unlink-ack. We may
+ * end up in an inconsistent state as we could before the new link
+ * protocol was introduced...
+ */
+ // header
+ header.write_tuple_head(3);
+ header.write_long(unlinkTag);
+ header.write_any(from);
+ header.write_any(dest);
+ deliver(new OtpMsg(unlinkIdAckTag, dest, from, unlink_id));
+ }
// fix up length in preamble
header.poke4BE(0, header.size() - 4);
@@ -444,6 +447,45 @@ public abstract class AbstractConnection extends Thread {
do_send(header);
}
+ /**
+ * Send unlink acknowledgment signal to remote process.
+ *
+ * @param dest
+ * the Erlang PID of the remote process.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication error
+ * occurs.
+ */
+ protected void sendUnlinkAck(final OtpErlangPid from, final OtpErlangPid dest,
+ long unlink_id)
+ throws IOException {
+ if (!connected) {
+ throw new IOException("Not connected");
+ }
+ if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) {
+ @SuppressWarnings("resource")
+ final OtpOutputStream header = new OtpOutputStream(headerLen);
+
+ // preamble: 4 byte length + "passthrough" tag
+ header.write4BE(0); // reserve space for length
+ header.write1(passThrough);
+ header.write1(version);
+
+ // header
+ header.write_tuple_head(4);
+ header.write_long(unlinkIdAckTag);
+ header.write_long(unlink_id);
+ header.write_any(from);
+ header.write_any(dest);
+ // fix up length in preamble
+ header.poke4BE(0, header.size() - 4);
+
+ do_send(header);
+ }
+
+ }
+
/* used internally when "processes" terminate */
protected void sendExit(final OtpErlangPid from, final OtpErlangPid dest,
final OtpErlangObject reason) throws IOException {
@@ -700,7 +742,21 @@ public abstract class AbstractConnection extends Thread {
from = (OtpErlangPid) head.elementAt(1);
to = (OtpErlangPid) head.elementAt(2);
- deliver(new OtpMsg(tag, from, to));
+ deliver(new OtpMsg(tag, from, to, 0));
+ break;
+
+ case unlinkIdTag: // { UNLINK_ID, UnlinkId, FromPid, ToPid}
+ case unlinkIdAckTag: // { UNLINK_ID_Ack, UnlinkId, FromPid, ToPid}
+ if (traceLevel >= ctrlThreshold) {
+ System.out.println("<- " + headerType(head) + " "
+ + head);
+ }
+
+ long unlink_id = ((OtpErlangLong) head.elementAt(1)).longValue();
+ from = (OtpErlangPid) head.elementAt(2);
+ to = (OtpErlangPid) head.elementAt(3);
+
+ deliver(new OtpMsg(tag, from, to, unlink_id));
break;
// absolutely no idea what to do with these, so we ignore
@@ -892,6 +948,12 @@ public abstract class AbstractConnection extends Thread {
case unlinkTag:
return "UNLINK";
+ case unlinkIdTag:
+ return "UNLINK_ID";
+
+ case unlinkIdAckTag:
+ return "UNLINK_ID_ACK";
+
case regSendTag:
return "REG_SEND";
@@ -953,10 +1015,12 @@ public abstract class AbstractConnection extends Thread {
}
protected void doAccept() throws IOException, OtpAuthException {
+ final int send_name_tag = recvName(peer);
try {
sendStatus("ok");
final int our_challenge = genChallenge();
- sendChallenge(peer.distChoose, localNode.flags, our_challenge);
+ sendChallenge(peer.flags, localNode.flags, our_challenge);
+ recvComplement(send_name_tag);
final int her_challenge = recvChallengeReply(our_challenge);
final byte[] our_digest = genDigest(her_challenge,
localNode.cookie());
@@ -992,12 +1056,14 @@ public abstract class AbstractConnection extends Thread {
System.out.println("-> MD5 CONNECT TO " + peer.host() + ":"
+ port);
}
- sendName(peer.distChoose, localNode.flags);
+ final int send_name_tag = sendName(peer.distChoose, localNode.flags,
+ localNode.creation);
recvStatus();
final int her_challenge = recvChallenge();
final byte[] our_digest = genDigest(her_challenge,
localNode.cookie());
final int our_challenge = genChallenge();
+ sendComplement(send_name_tag);
sendChallengeReply(our_challenge, our_digest);
recvChallengeAck(our_challenge);
cookieOk = true;
@@ -1070,17 +1136,31 @@ public abstract class AbstractConnection extends Thread {
return res;
}
- protected void sendName(final int dist, final int aflags)
+ protected int sendName(final int dist, final long aflags,
+ final int creation)
throws IOException {
@SuppressWarnings("resource")
final OtpOutputStream obuf = new OtpOutputStream();
final String str = localNode.node();
- obuf.write2BE(str.length() + 7); // 7 bytes + nodename
- obuf.write1(AbstractNode.NTYPE_R6);
- obuf.write2BE(dist);
- obuf.write4BE(aflags);
- obuf.write(str.getBytes());
+ int send_name_tag;
+ if (dist == 5) {
+ obuf.write2BE(1+2+4 + str.length());
+ send_name_tag = 'n';
+ obuf.write1(send_name_tag);
+ obuf.write2BE(dist);
+ obuf.write4BE(aflags);
+ obuf.write(str.getBytes());
+ }
+ else {
+ obuf.write2BE(1+8+4+2 + str.length());
+ send_name_tag = 'N';
+ obuf.write1(send_name_tag);
+ obuf.write8BE(aflags);
+ obuf.write4BE(creation);
+ obuf.write2BE(str.length());
+ obuf.write(str.getBytes());
+ }
obuf.writeToAndFlush(socket.getOutputStream());
@@ -1088,26 +1168,61 @@ public abstract class AbstractConnection extends Thread {
System.out.println("-> " + "HANDSHAKE sendName" + " flags="
+ aflags + " dist=" + dist + " local=" + localNode);
}
+ return send_name_tag;
+ }
+
+ protected void sendComplement(final int send_name_tag)
+ throws IOException {
+
+ if (send_name_tag == 'n' &&
+ (peer.flags & AbstractNode.dFlagHandshake23) != 0) {
+ @SuppressWarnings("resource")
+ final OtpOutputStream obuf = new OtpOutputStream();
+ obuf.write2BE(1+4+4);
+ obuf.write1('c');
+ final int flagsHigh = (int)(localNode.flags >> 32);
+ obuf.write4BE(flagsHigh);
+ obuf.write4BE(localNode.creation);
+
+ obuf.writeToAndFlush(socket.getOutputStream());
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("-> " + "HANDSHAKE sendComplement" +
+ " flagsHigh=" + flagsHigh +
+ " creation=" + localNode.creation);
+ }
+ }
}
- protected void sendChallenge(final int dist, final int aflags,
- final int challenge) throws IOException {
+ protected void sendChallenge(final long her_flags, final long our_flags,
+ final int challenge) throws IOException {
@SuppressWarnings("resource")
final OtpOutputStream obuf = new OtpOutputStream();
final String str = localNode.node();
- obuf.write2BE(str.length() + 11); // 11 bytes + nodename
- obuf.write1(AbstractNode.NTYPE_R6);
- obuf.write2BE(dist);
- obuf.write4BE(aflags);
- obuf.write4BE(challenge);
- obuf.write(str.getBytes());
+ if ((her_flags & AbstractNode.dFlagHandshake23) == 0) {
+ obuf.write2BE(1+2+4+4 + str.length());
+ obuf.write1('n');
+ obuf.write2BE(5);
+ obuf.write4BE(our_flags & 0xffffffff);
+ obuf.write4BE(challenge);
+ obuf.write(str.getBytes());
+ }
+ else {
+ obuf.write2BE(1+8+4+4+2 + str.length());
+ obuf.write1('N');
+ obuf.write8BE(our_flags);
+ obuf.write4BE(challenge);
+ obuf.write4BE(localNode.creation);
+ obuf.write2BE(str.length());
+ obuf.write(str.getBytes());
+ }
obuf.writeToAndFlush(socket.getOutputStream());
if (traceLevel >= handshakeThreshold) {
System.out.println("-> " + "HANDSHAKE sendChallenge" + " flags="
- + aflags + " dist=" + dist + " challenge=" + challenge
+ + our_flags + " challenge=" + challenge
+ " local=" + localNode);
}
}
@@ -1127,8 +1242,8 @@ public abstract class AbstractConnection extends Thread {
return tmpbuf;
}
- protected void recvName(final OtpPeer apeer) throws IOException {
-
+ protected int recvName(final OtpPeer apeer) throws IOException {
+ int send_name_tag;
String hisname = "";
try {
@@ -1137,25 +1252,31 @@ public abstract class AbstractConnection extends Thread {
final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0);
byte[] tmpname;
final int len = tmpbuf.length;
- apeer.ntype = ibuf.read1();
- if (apeer.ntype != AbstractNode.NTYPE_R6) {
- throw new IOException("Unknown remote node type");
- }
- apeer.distLow = apeer.distHigh = ibuf.read2BE();
- if (apeer.distLow < 5) {
+ send_name_tag = ibuf.read1();
+ switch (send_name_tag) {
+ case 'n':
+ apeer.distLow = apeer.distHigh = ibuf.read2BE();
+ if (apeer.distLow != 5)
+ throw new IOException("Invalid handshake version");
+ apeer.flags = ibuf.read4BE();
+ tmpname = new byte[len - 7];
+ ibuf.readN(tmpname);
+ hisname = OtpErlangString.newString(tmpname);
+ break;
+ case 'N':
+ apeer.distLow = apeer.distHigh = 6;
+ apeer.flags = ibuf.read8BE();
+ if ((apeer.flags & AbstractNode.dFlagHandshake23) == 0)
+ throw new IOException("Missing DFLAG_HANDSHAKE_23");
+ apeer.creation = ibuf.read4BE();
+ int namelen = ibuf.read2BE();
+ tmpname = new byte[namelen];
+ ibuf.readN(tmpname);
+ hisname = OtpErlangString.newString(tmpname);
+ break;
+ default:
throw new IOException("Unknown remote node type");
}
- apeer.flags = ibuf.read4BE();
- tmpname = new byte[len - 7];
- ibuf.readN(tmpname);
- hisname = OtpErlangString.newString(tmpname);
- // Set the old nodetype parameter to indicate hidden/normal status
- // When the old handshake is removed, the ntype should also be.
- if ((apeer.flags & AbstractNode.dFlagPublished) != 0) {
- apeer.ntype = AbstractNode.NTYPE_R4_ERLANG;
- } else {
- apeer.ntype = AbstractNode.NTYPE_R4_HIDDEN;
- }
if ((apeer.flags & AbstractNode.dFlagExtendedReferences) == 0) {
throw new IOException(
@@ -1180,6 +1301,7 @@ public abstract class AbstractConnection extends Thread {
System.out.println("<- " + "HANDSHAKE" + " ntype=" + apeer.ntype
+ " dist=" + apeer.distHigh + " remote=" + apeer);
}
+ return send_name_tag;
}
protected int recvChallenge() throws IOException {
@@ -1190,14 +1312,31 @@ public abstract class AbstractConnection extends Thread {
final byte[] buf = read2BytePackage();
@SuppressWarnings("resource")
final OtpInputStream ibuf = new OtpInputStream(buf, 0);
- peer.ntype = ibuf.read1();
- if (peer.ntype != AbstractNode.NTYPE_R6) {
+ int namelen;
+ switch (ibuf.read1()) {
+ case 'n':
+ if (peer.distChoose != 5)
+ throw new IOException("Old challenge wrong version");
+ peer.distLow = peer.distHigh = ibuf.read2BE();
+ peer.flags = ibuf.read4BE();
+ if ((peer.flags & AbstractNode.dFlagHandshake23) != 0)
+ throw new IOException("Old challenge unexpected DFLAG_HANDHAKE_23");
+ challenge = ibuf.read4BE();
+ namelen = buf.length - (1+2+4+4);
+ break;
+ case 'N':
+ peer.distLow = peer.distHigh = peer.distChoose = 6;
+ peer.flags = ibuf.read8BE();
+ if ((peer.flags & AbstractNode.dFlagHandshake23) == 0)
+ throw new IOException("New challenge missing DFLAG_HANDHAKE_23");
+ challenge = ibuf.read4BE();
+ peer.creation = ibuf.read4BE();
+ namelen = ibuf.read2BE();
+ break;
+ default:
throw new IOException("Unexpected peer type");
}
- peer.distLow = peer.distHigh = ibuf.read2BE();
- peer.flags = ibuf.read4BE();
- challenge = ibuf.read4BE();
- final byte[] tmpname = new byte[buf.length - 11];
+ final byte[] tmpname = new byte[namelen];
ibuf.readN(tmpname);
final String hisname = OtpErlangString.newString(tmpname);
if (!hisname.equals(peer.node)) {
@@ -1228,6 +1367,27 @@ public abstract class AbstractConnection extends Thread {
return challenge;
}
+ protected void recvComplement(int send_name_tag) throws IOException {
+
+ if (send_name_tag == 'n' &&
+ (peer.flags & AbstractNode.dFlagHandshake23) != 0) {
+ try {
+ final byte[] tmpbuf = read2BytePackage();
+ @SuppressWarnings("resource")
+ final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0);
+ if (ibuf.read1() != 'c')
+ throw new IOException("Not a complement tag");
+
+ final long flagsHigh = ibuf.read4BE();
+ peer.flags |= flagsHigh << 32;
+ peer.creation = ibuf.read4BE();
+
+ } catch (final OtpErlangDecodeException e) {
+ throw new IOException("Handshake failed - not enough data");
+ }
+ }
+ }
+
protected void sendChallengeReply(final int challenge, final byte[] digest)
throws IOException {
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java
index c3f71a84f0..09add55819 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java
@@ -74,10 +74,7 @@ public class AbstractNode implements OtpTransportFactory {
static String defaultCookie = null;
final OtpTransportFactory transportFactory;
- // Node types
static final int NTYPE_R6 = 110; // 'n' post-r5, all nodes
- static final int NTYPE_R4_ERLANG = 109; // 'm' Only for source compatibility
- static final int NTYPE_R4_HIDDEN = 104; // 'h' Only for source compatibility
// Node capability flags
static final int dFlagPublished = 1;
@@ -96,17 +93,21 @@ public class AbstractNode implements OtpTransportFactory {
static final int dFlagUtf8Atoms = 0x10000;
static final int dFlagMapTag = 0x20000;
static final int dFlagBigCreation = 0x40000;
+ static final int dFlagHandshake23 = 0x1000000;
+ static final int dFlagUnlinkId = 0x2000000;
int ntype = NTYPE_R6;
int proto = 0; // tcp/ip
- int distHigh = 5; // Cannot talk to nodes before R6
+ int distHigh = 6;
int distLow = 5; // Cannot talk to nodes before R6
int creation = 0;
- int flags = dFlagExtendedReferences | dFlagExtendedPidsPorts
+ long flags = dFlagExtendedReferences | dFlagExtendedPidsPorts
| dFlagBitBinaries | dFlagNewFloats | dFlagFunTags
| dflagNewFunTags | dFlagUtf8Atoms | dFlagMapTag
| dFlagExportPtrTag
- | dFlagBigCreation;
+ | dFlagBigCreation
+ | dFlagHandshake23
+ | dFlagUnlinkId;
/* initialize hostname and default cookie */
static {
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/Link.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/Link.java
index 18aa825759..78890d1cde 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/Link.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/Link.java
@@ -23,11 +23,14 @@ package com.ericsson.otp.erlang;
class Link {
private final OtpErlangPid local;
private final OtpErlangPid remote;
+ private long unlinking = 0;
private int hashCodeValue = 0;
public Link(final OtpErlangPid local, final OtpErlangPid remote) {
this.local = local;
this.remote = remote;
+ this.unlinking = 0;
+
}
public OtpErlangPid local() {
@@ -47,6 +50,14 @@ class Link {
|| local.equals(aremote) && remote.equals(alocal);
}
+ public long getUnlinking() {
+ return this.unlinking;
+ }
+
+ public void setUnlinking(long unlink_id) {
+ this.unlinking = unlink_id;
+ }
+
@Override
public int hashCode() {
if (hashCodeValue == 0) {
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/Links.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/Links.java
index 5f1bd40e76..032c7480af 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/Links.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/Links.java
@@ -23,6 +23,7 @@ package com.ericsson.otp.erlang;
class Links {
Link[] links;
int count;
+ int active;
Links() {
this(10);
@@ -31,68 +32,132 @@ class Links {
Links(final int initialSize) {
links = new Link[initialSize];
count = 0;
+ active = 0;
}
- synchronized void addLink(final OtpErlangPid local,
- final OtpErlangPid remote) {
- if (find(local, remote) == -1) {
+ // Try add link and return if it was added or not...
+ // If already existing it will not be added again.
+ // If force is true it is added even if it is
+ // currently unlinking; otherwise not.
+ synchronized boolean addLink(final OtpErlangPid local,
+ final OtpErlangPid remote,
+ final boolean force) {
+ int i = find(local, remote);
+ if (i != -1) {
+ if (links[i].getUnlinking() != 0 && force) {
+ links[i].setUnlinking(0);
+ active++;
+ return true;
+ }
+ return false;
+ }
+ else {
if (count >= links.length) {
final Link[] tmp = new Link[count * 2];
System.arraycopy(links, 0, tmp, 0, count);
links = tmp;
}
links[count++] = new Link(local, remote);
+ active++;
+ return true;
}
}
- synchronized void removeLink(final OtpErlangPid local,
- final OtpErlangPid remote) {
+ // Try remove link and return whether it was active or not...
+ synchronized boolean removeLink(final OtpErlangPid local,
+ final OtpErlangPid remote) {
int i;
if ((i = find(local, remote)) != -1) {
+ long unlinking = links[i].getUnlinking();
count--;
links[i] = links[count];
links[count] = null;
+ if (unlinking == 0) {
+ active--;
+ return true;
+ }
}
+ return false;
}
- synchronized boolean exists(final OtpErlangPid local,
- final OtpErlangPid remote) {
- return find(local, remote) != -1;
+ // Try remove active link and return whether it was removed or not...
+ synchronized boolean removeActiveLink(final OtpErlangPid local,
+ final OtpErlangPid remote) {
+ int i;
+
+ if ((i = find(local, remote)) != -1) {
+ long unlinking = links[i].getUnlinking();
+ if (unlinking != 0)
+ return false;
+ count--;
+ active--;
+ links[i] = links[count];
+ links[count] = null;
+ return true;
+ }
+ return false;
}
- synchronized int find(final OtpErlangPid local, final OtpErlangPid remote) {
- for (int i = 0; i < count; i++) {
- if (links[i].equals(local, remote)) {
- return i;
- }
+ // Remove link if unlink_id match and return whether it was removed or not...
+ synchronized boolean removeUnlinkingLink(final OtpErlangPid local,
+ final OtpErlangPid remote,
+ final long unlink_id) {
+ int i;
+
+ if (unlink_id == 0) {
+ return false;
}
- return -1;
+
+ if ((i = find(local, remote)) != -1) {
+ long unlinking = links[i].getUnlinking();
+ if (unlinking != unlink_id)
+ return false;
+ count--;
+ links[i] = links[count];
+ links[count] = null;
+ return true;
+ }
+ return false;
}
- int count() {
- return count;
+ synchronized boolean setUnlinking(final OtpErlangPid local,
+ final OtpErlangPid remote,
+ final long unlink_id) {
+ int i;
+
+ if (unlink_id == 0) {
+ return false;
+ }
+
+ if ((i = find(local, remote)) != -1) {
+ if (links[i].getUnlinking() == 0) {
+ links[i].setUnlinking(unlink_id);
+ active--;
+ return true;
+ }
+ }
+ return false;
}
- /* all local pids get notified about broken connection */
- synchronized OtpErlangPid[] localPids() {
- OtpErlangPid[] ret = null;
- if (count != 0) {
- ret = new OtpErlangPid[count];
- for (int i = 0; i < count; i++) {
- ret[i] = links[i].local();
+ synchronized int find(final OtpErlangPid local, final OtpErlangPid remote) {
+ for (int i = 0; i < count; i++) {
+ if (links[i].equals(local, remote)) {
+ return i;
}
}
- return ret;
+ return -1;
}
- /* all remote pids get notified about failed pid */
synchronized OtpErlangPid[] remotePids() {
OtpErlangPid[] ret = null;
- if (count != 0) {
- ret = new OtpErlangPid[count];
- for (int i = 0; i < count; i++) {
- ret[i] = links[i].remote();
+ if (active != 0) {
+ int a = 0;
+ ret = new OtpErlangPid[active];
+ for (int i = 0; a < active; i++) {
+ if (links[i].getUnlinking() == 0) {
+ ret[a++] = links[i].remote();
+ }
}
}
return ret;
@@ -112,13 +177,4 @@ class Links {
return ret;
}
- /* returns a copy of the link table */
- synchronized Link[] links() {
- Link[] ret = null;
- if (count != 0) {
- ret = new Link[count];
- System.arraycopy(links, 0, ret, 0, count);
- }
- return ret;
- }
}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/Makefile b/lib/jinterface/java_src/com/ericsson/otp/erlang/Makefile
index ee616f3d7e..bcbb206db6 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/Makefile
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/Makefile
@@ -67,8 +67,11 @@ JARFILE= OtpErlang.jar
# ----------------------------------------------------
# Programs and Flags
# ----------------------------------------------------
-
-JAR= jar
+ifeq ($(TARGET),win32)
+ JAR=jar.exe
+else
+ JAR=jar
+endif
CLASSPATH = $(JAVA_SRC_ROOT)
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java
index eb3eaa1f15..0cfb1bb39d 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java
@@ -49,6 +49,8 @@ import java.io.IOException;
public class OtpConnection extends AbstractConnection {
protected OtpSelf self;
protected GenericQueue queue; // messages get delivered here
+ protected Links links;
+ private long unlink_id;
/*
* Accept an incoming connection from a remote node. Used by {@link
@@ -96,7 +98,58 @@ public class OtpConnection extends AbstractConnection {
@Override
public void deliver(final OtpMsg msg) {
- queue.put(msg);
+ switch (msg.type()) {
+ case OtpMsg.exitTag:
+ case OtpMsg.linkTag:
+ case OtpMsg.unlinkTag:
+ case AbstractConnection.unlinkIdTag:
+ case AbstractConnection.unlinkIdAckTag:
+ handle_link_operation(msg);
+ break;
+ default:
+ queue.put(msg);
+ break;
+ }
+ }
+
+ private synchronized void handle_link_operation(final OtpMsg m) {
+ final OtpErlangPid remote = m.getSenderPid();
+ switch (m.type()) {
+ case OtpMsg.linkTag:
+ // only queue up link-message if link was added...
+ if (links.addLink(self.pid(), remote, false)) {
+ queue.put(m);
+ }
+ break;
+
+ case OtpMsg.unlinkTag:
+ case AbstractConnection.unlinkIdTag: {
+ final long unlink_id = m.getUnlinkId();
+ // only queue up unlink-message if link was removed...
+ if (links.removeActiveLink(self.pid(), remote)) {
+ // Use old unlinkTag without unlink id for
+ // backwards compatibility...
+ queue.put(new OtpMsg(OtpMsg.unlinkTag, self.pid(),
+ remote));
+ }
+ try {
+ super.sendUnlinkAck(self.pid(), remote, unlink_id);
+ } catch (final Exception e) {
+ }
+ break;
+ }
+
+ case AbstractConnection.unlinkIdAckTag:
+ links.removeUnlinkingLink(self.pid(), remote, m.getUnlinkId());
+ break;
+
+ case OtpMsg.exitTag:
+ // only queue up exit-message if link was removed...
+ if (links.removeActiveLink(self.pid(), remote)) {
+ queue.put(m);
+ }
+ break;
+ }
}
/**
@@ -544,7 +597,14 @@ public class OtpConnection extends AbstractConnection {
* occurs.
*/
public void link(final OtpErlangPid dest) throws IOException {
- super.sendLink(self.pid(), dest);
+ if (links.addLink(self.pid(), dest, true)) {
+ try {
+ super.sendLink(self.pid(), dest);
+ } catch (final IOException e) {
+ links.removeLink(self.pid(), dest); // restore...
+ throw e;
+ }
+ }
}
/**
@@ -560,7 +620,17 @@ public class OtpConnection extends AbstractConnection {
* occurs.
*/
public void unlink(final OtpErlangPid dest) throws IOException {
- super.sendUnlink(self.pid(), dest);
+ long unlink_id = this.unlink_id++;
+ if (unlink_id == 0)
+ unlink_id = this.unlink_id++;
+ if (links.setUnlinking(self.pid(), dest, unlink_id)) {
+ try {
+ super.sendUnlink(self.pid(), dest, unlink_id);
+ } catch (final IOException e) {
+ links.addLink(self.pid(), dest, true); // restore...
+ throw e;
+ }
+ }
}
/**
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java
index 011709beab..70ecc5b695 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java
@@ -122,9 +122,7 @@ public class OtpCookedConnection extends AbstractConnection {
switch (msg.type()) {
case OtpMsg.linkTag:
- if (delivered) {
- links.addLink(msg.getRecipientPid(), msg.getSenderPid());
- } else {
+ if (!delivered) {
try {
// no such pid - send exit to sender
super.sendExit(msg.getRecipientPid(), msg.getSenderPid(),
@@ -133,13 +131,7 @@ public class OtpCookedConnection extends AbstractConnection {
}
}
break;
-
- case OtpMsg.unlinkTag:
- case OtpMsg.exitTag:
- links.removeLink(msg.getRecipientPid(), msg.getSenderPid());
- break;
-
- case OtpMsg.exit2Tag:
+ default:
break;
}
@@ -200,30 +192,39 @@ public class OtpCookedConnection extends AbstractConnection {
}
}
- /*
- * snoop for outgoing links and update own table
- */
- synchronized void link(final OtpErlangPid from, final OtpErlangPid to)
- throws OtpErlangExit {
+ void link(final OtpErlangPid from, final OtpErlangPid to)
+ throws OtpErlangExit {
try {
super.sendLink(from, to);
- links.addLink(from, to);
} catch (final IOException e) {
throw new OtpErlangExit("noproc", to);
}
}
- /*
- * snoop for outgoing unlinks and update own table
- */
- synchronized void unlink(final OtpErlangPid from, final OtpErlangPid to) {
- links.removeLink(from, to);
+ void unlink(final OtpErlangPid from, final OtpErlangPid to, final long unlink_id) {
+ try {
+ super.sendUnlink(from, to , unlink_id);
+ } catch (final IOException e) {
+ }
+ }
+
+ void unlink_ack(final OtpErlangPid from, final OtpErlangPid to, final long unlink_id) {
try {
- super.sendUnlink(from, to);
+ super.sendUnlinkAck(from, to , unlink_id);
} catch (final IOException e) {
}
}
+ synchronized void node_link(OtpErlangPid local, OtpErlangPid remote, boolean add)
+ {
+ if (add) {
+ links.addLink(local, remote, true);
+ }
+ else {
+ links.removeLink(local, remote);
+ }
+ }
+
/*
* When the connection fails - send exit to all local pids with links
* through this connection
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java
index fffb8475d3..008ee9727e 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java
@@ -74,8 +74,9 @@ public class OtpEpmd {
private static final byte port4req = (byte) 122;
private static final byte port4resp = (byte) 119;
- private static final byte publish4req = (byte) 120;
- private static final byte publish4resp = (byte) 121;
+ private static final byte ALIVE2_REQ = (byte) 120;
+ private static final byte ALIVE2_RESP = (byte) 121;
+ private static final byte ALIVE2_X_RESP = (byte) 118;
private static final byte names4req = (byte) 110;
private static int traceLevel = 0;
@@ -287,7 +288,7 @@ public class OtpEpmd {
obuf.write2BE(node.alive().length() + 13);
- obuf.write1(publish4req);
+ obuf.write1(ALIVE2_REQ);
obuf.write2BE(node.port());
obuf.write1(node.type());
@@ -322,10 +323,11 @@ public class OtpEpmd {
final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0);
final int response = ibuf.read1();
- if (response == publish4resp) {
+ if (response == ALIVE2_RESP || response == ALIVE2_X_RESP) {
final int result = ibuf.read1();
if (result == 0) {
- node.creation = ibuf.read2BE();
+ node.creation = (response == ALIVE2_RESP
+ ? ibuf.read2BE() : ibuf.read4BE());
if (traceLevel >= traceThreshold) {
System.out.println("<- OK");
}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPid.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPid.java
index 9cbd735751..3abdf9535f 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPid.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPid.java
@@ -27,7 +27,6 @@ public class OtpErlangPid extends OtpErlangObject implements Comparable<Object>
// don't change this!
private static final long serialVersionUID = 1664394142301803659L;
- private final int tag;
private final String node;
private final int id;
private final int serial;
@@ -45,7 +44,6 @@ public class OtpErlangPid extends OtpErlangObject implements Comparable<Object>
public OtpErlangPid(final OtpLocalNode self) {
final OtpErlangPid p = self.createPid();
- tag = p.tag;
id = p.id;
serial = p.serial;
creation = p.creation;
@@ -67,7 +65,6 @@ public class OtpErlangPid extends OtpErlangObject implements Comparable<Object>
throws OtpErlangDecodeException {
final OtpErlangPid p = buf.read_pid();
- tag = p.tag;
node = p.node();
id = p.id();
serial = p.serial();
@@ -118,7 +115,6 @@ public class OtpErlangPid extends OtpErlangObject implements Comparable<Object>
*/
protected OtpErlangPid(final int tag, final String node, final int id,
final int serial, final int creation) {
- this.tag = tag;
this.node = node;
if (tag == OtpExternal.pidTag) {
this.id = id & 0x7fff; // 15 bits
@@ -133,7 +129,7 @@ public class OtpErlangPid extends OtpErlangObject implements Comparable<Object>
}
protected int tag() {
- return tag;
+ return OtpExternal.newPidTag;
}
/**
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPort.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPort.java
index 79b5d2736c..c8648d7aa3 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPort.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangPort.java
@@ -26,7 +26,6 @@ public class OtpErlangPort extends OtpErlangObject {
// don't change this!
private static final long serialVersionUID = 4037115468007644704L;
- private final int tag;
private final String node;
private final int id;
private final int creation;
@@ -43,7 +42,6 @@ public class OtpErlangPort extends OtpErlangObject {
private OtpErlangPort(final OtpSelf self) {
final OtpErlangPort p = self.createPort();
- tag = p.tag;
id = p.id;
creation = p.creation;
node = p.node;
@@ -64,7 +62,6 @@ public class OtpErlangPort extends OtpErlangObject {
throws OtpErlangDecodeException {
final OtpErlangPort p = buf.read_port();
- tag = p.tag;
node = p.node();
id = p.id();
creation = p.creation();
@@ -105,7 +102,6 @@ public class OtpErlangPort extends OtpErlangObject {
*/
public OtpErlangPort(final int tag, final String node, final int id,
final int creation) {
- this.tag = tag;
this.node = node;
if (tag == OtpExternal.portTag) {
this.id = id & 0xfffffff; // 28 bits
@@ -118,7 +114,7 @@ public class OtpErlangPort extends OtpErlangObject {
}
protected int tag() {
- return tag;
+ return OtpExternal.newPortTag;
}
/**
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangRef.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangRef.java
index 2165397013..2bf8d9a56b 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangRef.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpErlangRef.java
@@ -28,7 +28,6 @@ public class OtpErlangRef extends OtpErlangObject {
// don't change this!
private static final long serialVersionUID = -7022666480768586521L;
- private final int tag;
private final String node;
private final int creation;
@@ -49,7 +48,6 @@ public class OtpErlangRef extends OtpErlangObject {
public OtpErlangRef(final OtpLocalNode self) {
final OtpErlangRef r = self.createRef();
- tag = r.tag;
ids = r.ids;
creation = r.creation;
node = r.node;
@@ -70,7 +68,6 @@ public class OtpErlangRef extends OtpErlangObject {
throws OtpErlangDecodeException {
final OtpErlangRef r = buf.read_ref();
- tag = r.tag;
node = r.node();
creation = r.creation();
@@ -90,7 +87,6 @@ public class OtpErlangRef extends OtpErlangObject {
* another arbitrary number.
*/
public OtpErlangRef(final String node, final int id, final int creation) {
- this.tag = OtpExternal.newRefTag;
this.node = node;
ids = new int[1];
ids[0] = id & 0x3ffff; // 18 bits
@@ -138,7 +134,6 @@ public class OtpErlangRef extends OtpErlangObject {
*/
public OtpErlangRef(final int tag, final String node, final int[] ids,
final int creation) {
- this.tag = tag;
this.node = node;
// use at most 3 words
@@ -162,7 +157,7 @@ public class OtpErlangRef extends OtpErlangObject {
}
protected int tag() {
- return tag;
+ return OtpExternal.newerRefTag;
}
/**
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java
index 6d81ce630b..8cc5b3c21d 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java
@@ -239,6 +239,20 @@ public class OtpInputStream extends ByteArrayInputStream {
}
/**
+ * Read a eight byte big endian integer from the stream.
+ *
+ * @return the bytes read, converted from big endian to a long integer.
+ *
+ * @exception OtpErlangDecodeException
+ * if the next byte cannot be read.
+ */
+ public long read8BE() throws OtpErlangDecodeException {
+ long high = read4BE();
+ long low = read4BE();
+ return (high << 32) | (low & 0xffffffff);
+ }
+
+ /**
* Read a two byte little endian integer from the stream.
*
* @return the bytes read, converted from little endian to an integer.
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMbox.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMbox.java
index 29a8bc1540..3d46d21d60 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMbox.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMbox.java
@@ -84,6 +84,7 @@ public class OtpMbox {
GenericQueue queue;
String name;
Links links;
+ private long unlink_id;
// package constructor: called by OtpNode:createMbox(name)
// to create a named mbox
@@ -91,6 +92,7 @@ public class OtpMbox {
this.self = self;
this.home = home;
this.name = name;
+ this.unlink_id = 1;
queue = new GenericQueue();
links = new Links(10);
}
@@ -516,7 +518,10 @@ public class OtpMbox {
* or could not be reached.
*
*/
- public void link(final OtpErlangPid to) throws OtpErlangExit {
+ public synchronized void link(final OtpErlangPid to) throws OtpErlangExit {
+ if (!links.addLink(self, to, true))
+ return; /* Already linked... */
+
try {
final String node = to.node();
if (node.equals(home.node())) {
@@ -526,17 +531,18 @@ public class OtpMbox {
} else {
final OtpCookedConnection conn = home.getConnection(node);
if (conn != null) {
- conn.link(self, to);
+ conn.link(self, to); // may throw 'noproc'
+ conn.node_link(self, to, true);
} else {
throw new OtpErlangExit("noproc", to);
}
}
} catch (final OtpErlangExit e) {
+ links.removeLink(self, to);
throw e;
} catch (final Exception e) {
}
- links.addLink(self, to);
}
/**
@@ -552,25 +558,41 @@ public class OtpMbox {
* from.
*
*/
- public void unlink(final OtpErlangPid to) {
- links.removeLink(self, to);
-
- try {
- final String node = to.node();
- if (node.equals(home.node())) {
- home.deliver(new OtpMsg(OtpMsg.unlinkTag, self, to));
- } else {
- final OtpCookedConnection conn = home.getConnection(node);
- if (conn != null) {
- conn.unlink(self, to);
+ public synchronized void unlink(final OtpErlangPid to) {
+ long unlink_id = this.unlink_id++;
+ if (unlink_id == 0)
+ unlink_id = this.unlink_id++;
+ if (links.setUnlinking(self, to, unlink_id)) {
+ try {
+ final String node = to.node();
+ if (node.equals(home.node())) {
+ home.deliver(new OtpMsg(OtpMsg.unlinkTag, self, to));
+ } else {
+ final OtpCookedConnection conn = home.getConnection(node);
+ if (conn != null) {
+ conn.unlink(self, to, unlink_id);
+ }
}
+ } catch (final Exception e) {
}
- } catch (final Exception e) {
}
}
/**
* <p>
+ * Get information about all processes and/or mail boxes currently
+ * linked to this mail box.
+ * </p>
+ *
+ * @return an array of all pids currently linked to this mail box.
+ *
+ */
+ public synchronized OtpErlangPid[] linked() {
+ return links.remotePids();
+ }
+
+ /**
+ * <p>
* Create a connection to a remote node.
* </p>
*
@@ -688,40 +710,92 @@ public class OtpMbox {
* called by OtpNode to deliver message to this mailbox.
*
* About exit and exit2: both cause exception to be raised upon receive().
- * However exit (not 2) causes any link to be removed as well, while exit2
- * leaves any links intact.
+ * However exit (not 2) only has an effect if there exist a link.
*/
void deliver(final OtpMsg m) {
switch (m.type()) {
+ case OtpMsg.exitTag:
case OtpMsg.linkTag:
- links.addLink(self, m.getSenderPid());
+ case OtpMsg.unlinkTag:
+ case AbstractConnection.unlinkIdTag:
+ case AbstractConnection.unlinkIdAckTag:
+ handle_link_operation(m);
+ break;
+ default:
+ queue.put(m);
+ break;
+ }
+ }
+
+ private synchronized void handle_link_operation(final OtpMsg m) {
+ final OtpErlangPid remote = m.getSenderPid();
+ final String node = remote.node();
+ final boolean is_local = node.equals(home.node());
+ final OtpCookedConnection conn = is_local ? null : home.getConnection(node);
+
+ switch (m.type()) {
+ case OtpMsg.linkTag:
+ if (links.addLink(self, remote, false)) {
+ if (!is_local) {
+ if (conn != null)
+ conn.node_link(self, remote, true);
+ else {
+ links.removeLink(self, remote);
+ queue.put(new OtpMsg(OtpMsg.exitTag, remote, self,
+ new OtpErlangAtom("noconnection")));
+ }
+ }
+ }
break;
case OtpMsg.unlinkTag:
- links.removeLink(self, m.getSenderPid());
+ case AbstractConnection.unlinkIdTag: {
+ final long unlink_id = m.getUnlinkId();
+ final boolean removed = links.removeActiveLink(self, remote);
+ try {
+ if (is_local) {
+ home.deliver(new OtpMsg(AbstractConnection.unlinkIdAckTag,
+ self, remote, unlink_id));
+ } else if (conn != null) {
+ if (removed)
+ conn.node_link(self, remote, false);
+ conn.unlink_ack(self, remote, unlink_id);
+ }
+ } catch (final Exception e) {
+ }
break;
+ }
- case OtpMsg.exitTag:
- links.removeLink(self, m.getSenderPid());
- queue.put(m);
+ case AbstractConnection.unlinkIdAckTag:
+ links.removeUnlinkingLink(self, m.getSenderPid(), m.getUnlinkId());
break;
- case OtpMsg.exit2Tag:
- default:
- queue.put(m);
+ case OtpMsg.exitTag:
+ if (links.removeActiveLink(self, m.getSenderPid())) {
+ queue.put(m);
+ }
break;
}
}
// used to break all known links to this mbox
- void breakLinks(final OtpErlangObject reason) {
+ synchronized void breakLinks(final OtpErlangObject reason) {
final Link[] l = links.clearLinks();
if (l != null) {
final int len = l.length;
for (int i = 0; i < len; i++) {
- exit(1, l[i].remote(), reason);
+ if (l[i].getUnlinking() == 0) {
+ OtpErlangPid remote = l[i].remote();
+ final String node = remote.node();
+ if (!node.equals(home.node())) {
+ final OtpCookedConnection conn = home.getConnection(node);
+ if (conn != null)
+ conn.node_link(self, remote, false);
+ }
+ exit(1, remote, reason);
+ }
}
}
}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMsg.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMsg.java
index 5bbcf2ab9e..0f883d5deb 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMsg.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpMsg.java
@@ -68,6 +68,7 @@ public class OtpMsg {
protected OtpErlangPid from;
protected OtpErlangPid to;
protected String toName;
+ protected long unlink_id;
// send has receiver pid but no sender information
OtpMsg(final OtpErlangPid to, final OtpInputStream paybuf) {
@@ -77,6 +78,7 @@ public class OtpMsg {
toName = null;
this.paybuf = paybuf;
payload = null;
+ this.unlink_id = 0;
}
// send has receiver pid but no sender information
@@ -87,6 +89,7 @@ public class OtpMsg {
toName = null;
paybuf = null;
this.payload = payload;
+ this.unlink_id = 0;
}
// send_reg has sender pid and receiver name
@@ -98,6 +101,7 @@ public class OtpMsg {
to = null;
this.paybuf = paybuf;
payload = null;
+ this.unlink_id = 0;
}
// send_reg has sender pid and receiver name
@@ -109,6 +113,7 @@ public class OtpMsg {
to = null;
paybuf = null;
this.payload = payload;
+ this.unlink_id = 0;
}
// exit (etc) has from, to, reason
@@ -117,8 +122,10 @@ public class OtpMsg {
this.tag = tag;
this.from = from;
this.to = to;
+ this.unlink_id = 0;
paybuf = null;
payload = reason;
+ this.unlink_id = 0;
}
// special case when reason is an atom (i.e. most of the time)
@@ -129,19 +136,52 @@ public class OtpMsg {
this.to = to;
paybuf = null;
payload = new OtpErlangAtom(reason);
+ this.unlink_id = 0;
}
- // other message types (link, unlink)
+ // other message types (link and old unlink)
OtpMsg(final int tag, final OtpErlangPid from, final OtpErlangPid to) {
// convert TT-tags to equiv non-TT versions
- int atag = tag;
- if (tag > 10) {
- atag -= 10;
- }
+ this.tag = drop_tt_tag(tag);
+ this.from = from;
+ this.to = to;
+ this.unlink_id = 0;
+ }
- this.tag = atag;
+ // unlink
+ OtpMsg(final int tag, final OtpErlangPid from, final OtpErlangPid to,
+ final long unlink_id) {
+ // convert TT-tags to equiv non-TT versions
+ this.tag = drop_tt_tag(tag);
this.from = from;
this.to = to;
+ this.unlink_id = unlink_id;
+ }
+
+ private int drop_tt_tag(final int tag) {
+ switch (tag) {
+ case AbstractConnection.sendTTTag:
+ return OtpMsg.sendTag;
+ case AbstractConnection.exitTTTag:
+ return OtpMsg.exitTag;
+ case AbstractConnection.regSendTTTag:
+ return OtpMsg.regSendTag;
+ case AbstractConnection.exit2TTTag:
+ return OtpMsg.exit2Tag;
+ default:
+ return tag;
+ }
+ }
+
+ /**
+ * Get unlink identifier of an unlink or unlink acknowledgment
+ * message. For package internal use only.
+ *
+ * @return the serialized Erlang term contained in this message.
+ *
+ */
+ long getUnlinkId() {
+ return this.unlink_id;
}
/**
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java
index 5e777c1164..917e5baf3a 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java
@@ -713,7 +713,7 @@ public class OtpOutputStream extends ByteArrayOutputStream {
*/
public void write_pid(final String node, final int id, final int serial,
final int creation) {
- write1(OtpExternal.pidTag);
+ write1(OtpExternal.newPidTag);
write_atom(node);
write4BE(id & 0x7fff); // 15 bits
write4BE(serial & 0x1fff); // 13 bits
@@ -727,20 +727,11 @@ public class OtpOutputStream extends ByteArrayOutputStream {
* the pid
*/
public void write_pid(OtpErlangPid pid) {
- write1(pid.tag());
+ write1(OtpExternal.newPidTag);
write_atom(pid.node());
write4BE(pid.id());
write4BE(pid.serial());
- switch (pid.tag()) {
- case OtpExternal.pidTag:
- write1(pid.creation());
- break;
- case OtpExternal.newPidTag:
- write4BE(pid.creation());
- break;
- default:
- throw new AssertionError("Invalid pid tag " + pid.tag());
- }
+ write4BE(pid.creation());
}
@@ -758,7 +749,7 @@ public class OtpOutputStream extends ByteArrayOutputStream {
* be used.
*/
public void write_port(final String node, final int id, final int creation) {
- write1(OtpExternal.portTag);
+ write1(OtpExternal.newPortTag);
write_atom(node);
write4BE(id & 0xfffffff); // 28 bits
write1(creation & 0x3); // 2 bits
@@ -771,19 +762,10 @@ public class OtpOutputStream extends ByteArrayOutputStream {
* the port.
*/
public void write_port(OtpErlangPort port) {
- write1(port.tag());
+ write1(OtpExternal.newPortTag);
write_atom(port.node());
write4BE(port.id());
- switch (port.tag()) {
- case OtpExternal.portTag:
- write1(port.creation());
- break;
- case OtpExternal.newPortTag:
- write4BE(port.creation());
- break;
- default:
- throw new AssertionError("Invalid port tag " + port.tag());
- }
+ write4BE(port.creation());
}
/**
@@ -829,7 +811,7 @@ public class OtpOutputStream extends ByteArrayOutputStream {
arity = 3; // max 3 words in ref
}
- write1(OtpExternal.newRefTag);
+ write1(OtpExternal.newerRefTag);
// how many id values
write2BE(arity);
@@ -857,24 +839,12 @@ public class OtpOutputStream extends ByteArrayOutputStream {
int[] ids = ref.ids();
int arity = ids.length;
- write1(ref.tag());
+ write1(OtpExternal.newerRefTag);
write2BE(arity);
write_atom(ref.node());
+ write4BE(ref.creation());
- switch (ref.tag()) {
- case OtpExternal.newRefTag:
- write1(ref.creation());
- write4BE(ids[0] & 0x3ffff); // first word gets truncated to 18 bits
- break;
- case OtpExternal.newerRefTag:
- write4BE(ref.creation());
- write4BE(ids[0]); // full first word
- break;
- default:
- throw new AssertionError("Invalid ref tag " + ref.tag());
- }
-
- for (int i = 1; i < arity; i++) {
+ for (int i = 0; i < arity; i++) {
write4BE(ids[i]);
}
}