summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoreea1 <eea1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-24 23:09:41 +0000
committereea1 <eea1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-24 23:09:41 +0000
commitf69420de3b964a4f1de9b9c310ca9ced2461e24e (patch)
tree0f5d21862b09b96b4574fd520e7a25b92e1de059
parentad042247239d1bce56bf906816c585543cb804eb (diff)
downloadATCD-f69420de3b964a4f1de9b9c310ca9ced2461e24e.tar.gz
Updated source files for ASX.
-rw-r--r--java/JACE/ASX/IOCntlCmds.java38
-rw-r--r--java/JACE/ASX/IOCntlMsg.java124
-rw-r--r--java/JACE/ASX/MessageBlock.java447
-rw-r--r--java/JACE/ASX/MessageQueue.java633
-rw-r--r--java/JACE/ASX/MessageType.java102
-rw-r--r--java/JACE/ASX/Module.java246
-rw-r--r--java/JACE/ASX/Stream.java436
-rw-r--r--java/JACE/ASX/StreamHead.java123
-rw-r--r--java/JACE/ASX/StreamTail.java114
-rw-r--r--java/JACE/ASX/Task.java443
-rw-r--r--java/JACE/ASX/TaskFlags.java49
-rw-r--r--java/JACE/ASX/ThruTask.java44
-rw-r--r--java/JACE/ASX/TimeValue.java296
-rw-r--r--java/JACE/ASX/TimedWait.java157
-rw-r--r--java/JACE/ASX/TimeoutException.java37
-rw-r--r--java/JACE/ASX/package.html11
16 files changed, 3300 insertions, 0 deletions
diff --git a/java/JACE/ASX/IOCntlCmds.java b/java/JACE/ASX/IOCntlCmds.java
new file mode 100644
index 00000000000..3cc7c76256d
--- /dev/null
+++ b/java/JACE/ASX/IOCntlCmds.java
@@ -0,0 +1,38 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * TaskFlags.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+/**
+ * Constants used for IOCTL messages.
+ *
+ *@see JACE.ASX.IOCntlMsg
+ */
+public abstract class IOCntlCmds
+{
+ /** Set the low water mark. */
+ public static final int SET_LWM = 1;
+
+ /** Get the low water mark. */
+ public static final int GET_LWM = 2;
+
+ /** Set the high water mark. */
+ public static final int SET_HWM = 3;
+
+ /** Get the high water mark. */
+ public static final int GET_HWM = 4;
+
+ /** Link modules */
+ public static final int MOD_LINK = 5;
+
+ /** Unlink modules */
+ public static final int MOD_UNLINK = 6;
+}
diff --git a/java/JACE/ASX/IOCntlMsg.java b/java/JACE/ASX/IOCntlMsg.java
new file mode 100644
index 00000000000..4eea18b36f6
--- /dev/null
+++ b/java/JACE/ASX/IOCntlMsg.java
@@ -0,0 +1,124 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * IOCntlMsg.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+import JACE.OS.*;
+
+/**
+ * Data format for IOCTL messages.
+ */
+public class IOCntlMsg
+{
+
+ // = Initialization method.
+
+ /*
+ * Initialize the control message.
+ *@param c IOCntlCmd for the control message. Note that this should
+ * be of type IOCntlCmds
+ */
+ public IOCntlMsg (int c)
+ {
+ this.cmd_ = c;
+ }
+
+ // = Get/set methods
+
+ /*
+ * Get the command.
+ *@return the command.
+ */
+ public int cmd ()
+ {
+ return this.cmd_;
+ }
+
+ /*
+ * Set the command.
+ *@param c the command.
+ */
+ public void cmd (int c)
+ {
+ this.cmd_ = c;
+ }
+
+ /*
+ * Get the count.
+ *@return the count.
+ */
+ public int count ()
+ {
+ return this.count_;
+ }
+
+ /*
+ * Set the count.
+ *@param c the count.
+ */
+ public void count (int c)
+ {
+ this.count_ = c;
+ }
+
+ /*
+ * Get the error.
+ *@return the error.
+ */
+ public int error ()
+ {
+ return this.error_;
+ }
+
+ /*
+ * Set the error.
+ *@param e the error.
+ */
+ public void error (int e)
+ {
+ this.error_ = e;
+ }
+
+ /*
+ * Get the return value.
+ *@return the return value.
+ */
+ public int rval ()
+ {
+ return this.rval_;
+ }
+
+ /*
+ * Set the return value.
+ *@param r the return value.
+ */
+ public void rval (int r)
+ {
+ this.rval_ = r;
+ }
+
+ public String toString ()
+ {
+ return (new Integer (this.cmd_)).toString ();
+ }
+
+ private int cmd_;
+ // Command.
+
+ private int count_;
+ // Count.
+
+ private int error_;
+ // Error.
+
+ private int rval_;
+ // Return value
+}
diff --git a/java/JACE/ASX/MessageBlock.java b/java/JACE/ASX/MessageBlock.java
new file mode 100644
index 00000000000..4035e34a7bd
--- /dev/null
+++ b/java/JACE/ASX/MessageBlock.java
@@ -0,0 +1,447 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * MessageBlock.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+import JACE.OS.*;
+
+/**
+ * Object used to store messages in the ASX framework.<P>
+ *
+ * <tt>MessageBlock</tt> is modeled after the message data structures
+ * used in System V STREAMS. A <tt>MessageBlock</tt> is composed of
+ * one or more <tt>MessageBlock</tt>s that are linked together by
+ * <em>PREV</em> and <em>NEXT</em> pointers. In addition, a
+ * <tt>MessageBlock</tt> may also be linked to a chain of other
+ * <tt>MessageBlock</tt>s. This structure enables efficient manipulation
+ * of arbitrarily-large messages <em>without</em> incurring memory
+ * copying overhead.
+ *
+ *@see MessageQueue
+ */
+public class MessageBlock
+{
+ /**
+ * Create an empty Message Block
+ */
+ public MessageBlock ()
+ {
+ this (0);
+ }
+
+ /**
+ * Create an empty Message Block.
+ * Note that this assumes that type of MessageBlock is MB_DATA.
+ *@param size size of the Message Block to create.
+ */
+ public MessageBlock (int size)
+ {
+ // Note the explicit cast toString() is needed. For some strange
+ // reason, it fails otherwise if size == 0.
+ this ((new StringBuffer (size)).toString ());
+ }
+
+ /**
+ * Create a Message Block. Note that this assumes that type of
+ * MessageBlock is MB_DATA.
+ *@param data initial data to create a Message Block with.
+ */
+ public MessageBlock (String data)
+ {
+ this (MessageType.MB_DATA,
+ null,
+ data);
+ }
+
+ /**
+ * Create a Message Block.
+ *@param type type of the Message Block (must be one of those
+ * specified in class Message Type)
+ *@param cont next block of data
+ *@param data initial data to create Message Block with
+ */
+ public MessageBlock (int type,
+ MessageBlock cont,
+ String data)
+ {
+ this.flags_ = 0;
+ this.priority_ = 0;
+ this.next_ = null;
+ this.prev_ = null;
+
+ this.init (type, cont, data);
+ }
+
+ /**
+ * Create a Message Block. Note that this assumes that type of
+ * MessageBlock is MB_OBJECT.
+ *@param obj initial object to create a Message Block with.
+ */
+ public MessageBlock (Object obj)
+ {
+ this (MessageType.MB_OBJECT,
+ null,
+ obj);
+ }
+
+ /**
+ * Create a Message Block.
+ *@param type type of the Message Block (must be one of those
+ * specified in class Message Type)
+ *@param cont next block of data
+ *@param obj initial object to create Message Block with
+ */
+ public MessageBlock (int type,
+ MessageBlock cont,
+ Object obj)
+ {
+ this.init (type, cont, obj);
+ }
+
+ /* Initialize the Message Block
+ *@param data data to initialize Message Block with
+ */
+ public void init (String data)
+ {
+ this.base_ = new StringBuffer (data);
+ }
+
+ /**
+ * Initialize a Message Block.
+ *@param type type of the Message Block (must be one of those
+ * specified in class Message Type)
+ *@param cont next block of data
+ *@param data data to initialize Message Block with
+ */
+ public void init (int msgType,
+ MessageBlock msgCont,
+ String data)
+ {
+ if (data.length () == 0)
+ this.base_ = new StringBuffer (0);
+ else
+ this.base_ = new StringBuffer (data);
+ this.type_ = msgType;
+ this.cont_ = msgCont;
+ }
+
+ /**
+ * Initialize a Message Block. Note that this assumes that type of
+ * MessageBlock is MB_OBJECT.
+ *@param obj initial object to initialize a Message Block with.
+ */
+ public void init (Object obj)
+ {
+ this.init (MessageType.MB_OBJECT, null, obj);
+ }
+
+ /**
+ * Initialize a Message Block.
+ *@param type type of the Message Block (must be one of those
+ * specified in class Message Type)
+ *@param cont next block of data
+ *@param obj object to initialize Message Block with
+ */
+ public void init (int msgType,
+ MessageBlock msgCont,
+ Object obj)
+ {
+ this.obj_ = obj;
+ this.type_ = msgType;
+ this.cont_ = msgCont;
+ this.flags_ = 0;
+ this.priority_ = 0;
+ this.next_ = null;
+ this.prev_ = null;
+ }
+
+ /**
+ * Set message flags. Note that the flags will be set on top of
+ * already set flags.
+ *@param moreFlags flags to set for the Message Block.
+ */
+ public long setFlags (long moreFlags)
+ {
+ // Later we might mask more_flags so that user can't change
+ // internal ones: more_flags &= ~(USER_FLAGS -1).
+ this.flags_ = ACE.SET_BITS (this.flags_, moreFlags);
+ return this.flags_;
+ }
+
+ /**
+ * Unset message flags.
+ *@param lessFlags flags to unset for the Message Block.
+ */
+ public long clrFlags (long lessFlags)
+ {
+ // Later we might mask more_flags so that user can't change
+ // internal ones: less_flags &= ~(USER_FLAGS -1).
+ this.flags_ = ACE.CLR_BITS (this.flags_, lessFlags);
+ return this.flags_;
+ }
+
+ /**
+ * Get the message flags.
+ *@return Message flags
+ */
+ public long flags ()
+ {
+ return this.flags_;
+ }
+
+ /**
+ * Get the type of the message.
+ *@return message type
+ */
+ public int msgType ()
+ {
+ return this.type_;
+ }
+
+ /**
+ * Set the type of the message.
+ *@param t type of the message
+ */
+ public void msgType (int t)
+ {
+ this.type_ = t;
+ }
+
+ /**
+ * Get the class of the message. Note there are two classes,
+ * <normal> messages and <high-priority> messages.
+ *@return message class
+ */
+ public int msgClass ()
+ {
+ return this.msgType () >= MessageType.MB_PRIORITY
+ ? MessageType.MB_PRIORITY : MessageType.MB_NORMAL;
+ }
+
+ /**
+ * Find out if the message is a data message.
+ *@return true if message is a data message, false otherwise
+ */
+ public boolean isDataMsg ()
+ {
+ int mt = this.msgType ();
+ return mt == MessageType.MB_DATA
+ || mt == MessageType.MB_PROTO
+ || mt == MessageType.MB_PCPROTO;
+ }
+
+ /**
+ * Find out if the message is an object message.
+ *@return true if message is an object message, false otherwise
+ */
+ public boolean isObjMsg ()
+ {
+ int mt = this.msgType ();
+ return mt == MessageType.MB_OBJECT
+ || mt == MessageType.MB_PROTO
+ || mt == MessageType.MB_PCPROTO;
+ }
+
+ /**
+ * Get the priority of the message.
+ *@return message priority
+ */
+ public long msgPriority ()
+ {
+ return this.priority_;
+ }
+
+ /**
+ * Set the priority of the message.
+ *@param pri priority of the message
+ */
+ public void msgPriority (long pri)
+ {
+ this.priority_ = pri;
+ }
+
+ /**
+ * Get message data. This assumes that msgType is MB_DATA.
+ *@return message data
+ */
+ public String base ()
+ {
+ // Create a String object to return
+ char temp[] = new char [this.base_.length ()];
+ this.base_.getChars (0, this.base_.length (), temp, 0);
+ return new String (temp);
+ }
+
+ /**
+ * Set the message data. This assumes that msgType is MB_DATA.
+ *@param data message data
+ *@param msgFlags message flags
+ */
+ public void base (String data,
+ long msgFlags)
+ {
+ this.base_ = new StringBuffer (data);
+ this.flags_ = msgFlags;
+ }
+
+ /**
+ * Get message object. This assumes that msgType is MB_OBJECT.
+ *@return message object
+ */
+ public Object obj ()
+ {
+ return this.obj_;
+ }
+
+ /**
+ * Set the message object. This assumes that msgType is MB_OBJECT.
+ *@param object message object
+ *@param msgFlags message flags
+ */
+ public void obj (Object obj,
+ long msgFlags)
+ {
+ this.obj_ = obj;
+ this.flags_ = msgFlags;
+ }
+
+ // = The following four methods only make sense if the Message_Block
+ // is of type MB_DATA and not MB_OBJECT.
+
+ /**
+ * Get length of the message. This method only makes sense if the
+ * MessageBlock is of type MB_DATA and not MB_OBJECT.
+ *@return length of the message.
+ */
+ public int length ()
+ {
+ return this.base_.length ();
+ }
+
+ /**
+ * Set the length of the message. This method only makes sense if the
+ * MessageBlock is of type MB_DATA and not MB_OBJECT.
+ *@param n message length
+ */
+ public void length (int n)
+ {
+ this.base_.setLength (n);
+ }
+
+ /**
+ * Get size of the allocated buffer for the message. This method
+ * only makes sense if the MessageBlock is of type MB_DATA and not
+ * MB_OBJECT.
+ *@return size of the message buffer
+ */
+ public int size ()
+ {
+ return this.base_.capacity ();
+ }
+
+ /**
+ * Set the total size of the buffer. This method will grow the
+ * buffer if need be. Also, this method only makes sense if the
+ * MessageBlock is of type MB_DATA and not MB_OBJECT.
+ *@param n size of message buffer
+ */
+ public void size (int n)
+ {
+ this.base_.ensureCapacity (n);
+ }
+
+
+ /**
+ * Get the continuation field. The coninuation field is used to
+ * chain together composite messages.
+ *@return the continuation field
+ */
+ public MessageBlock cont ()
+ {
+ return this.cont_;
+ }
+
+ /**
+ * Set the continuation field. The coninuation field is used to
+ * chain together composite messages.
+ *@param msgCont continuation field
+ */
+ void cont (MessageBlock msgCont)
+ {
+ this.cont_ = msgCont;
+ }
+
+ /**
+ * Get link to next message. The next message points to the
+ * <MessageBlock> directly ahead in the MessageQueue.
+ *@return next message block
+ */
+ MessageBlock next ()
+ {
+ return this.next_;
+ }
+
+ /**
+ * Set link to next message. The next message points to the
+ * <MessageBlock> directly ahead in the MessageQueue.
+ *@param msgBlock next message block
+ */
+ void next (MessageBlock msgBlock)
+ {
+ this.next_ = msgBlock;
+ }
+
+ /**
+ * Get link to previous message. The previous message points to the
+ * <MessageBlock> directly before in the MessageQueue.
+ *@return previous message block
+ */
+ MessageBlock prev ()
+ {
+ return this.prev_;
+ }
+
+ /**
+ * Set link to previous message. The previous message points to the
+ * <MessageBlock> directly before in the MessageQueue.
+ *@param msgBlock previous message block
+ */
+ void prev (MessageBlock msgBlock)
+ {
+ this.prev_ = msgBlock;
+ }
+
+ private int type_;
+ // Type of message.
+
+ private long flags_;
+ // Misc flags.
+
+ private long priority_;
+ // Priority of message.
+
+ private StringBuffer base_;
+ // String data of message block (initialized to null).
+
+ private Object obj_;
+ // Object data of message block (initialized to null).
+
+ private MessageBlock cont_;
+ // Next message block in the chain.
+
+ private MessageBlock next_;
+ // Next message in the list.
+
+ private MessageBlock prev_;
+ // Previous message in the list.
+
+}
+
diff --git a/java/JACE/ASX/MessageQueue.java b/java/JACE/ASX/MessageQueue.java
new file mode 100644
index 00000000000..df25870dd52
--- /dev/null
+++ b/java/JACE/ASX/MessageQueue.java
@@ -0,0 +1,633 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * MessageQueue.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+import java.util.Date;
+import JACE.OS.*;
+import JACE.Reactor.*;
+
+class NotFullCondition extends TimedWait
+{
+ public NotFullCondition (MessageQueue mq)
+ {
+ super (mq);
+ this.mq_ = mq;
+ }
+
+ public boolean condition () {
+ // Delegate to the appropriate conditional
+ // check on the MessageQueue.
+ return !this.mq_.isFull ();
+ }
+ private MessageQueue mq_;
+}
+
+class NotEmptyCondition extends TimedWait
+{
+ public NotEmptyCondition (MessageQueue mq)
+ {
+ super (mq);
+ this.mq_ = mq;
+ }
+
+ public boolean condition () {
+ // Delegate to the appropriate conditional
+ // check on the MessageQueue.
+ return !this.mq_.isEmpty ();
+ }
+ private MessageQueue mq_;
+}
+
+
+/**
+ * A thread-safe message queueing facility, modeled after the
+ * queueing facilities in System V StreamS. <P>
+ *
+ * <tt>MessageQueue</tt> is the central queueing facility for messages
+ * in the ASX framework. All operations are thread-safe, as it is intended
+ * to be used for inter-thread communication (<em>e.g.</em>, a producer and
+ * consumer thread joined by a <tt>MessageQueue</tt>). The queue
+ * consists of <tt>MessageBlock</tt>s.
+ *</blockquote>
+ *
+ *@see MessageBlock
+ *@see TimeValue
+ */
+public class MessageQueue
+{
+ /**
+ * Default constructor
+ */
+ public MessageQueue ()
+ {
+ this (DEFAULT_HWM, DEFAULT_LWM);
+ }
+
+ /**
+ * Create a Message Queue with high and low water marks.
+ *@param hwm High water mark (max number of bytes allowed in the
+ * queue)
+ *@param lwm Low water mark (min number of bytes in the queue)
+ */
+ public MessageQueue (int hwm, int lwm)
+ {
+ if (this.open (hwm, lwm) == -1)
+ ACE.ERROR ("open");
+ }
+
+ /**
+ * Initialize a Message Queue with high and low water marks.
+ *@param hwm High water mark (max number of bytes allowed in the
+ * queue)
+ *@param lwm Low water mark (min number of bytes in the queue)
+ */
+ public synchronized int open (int hwm, int lwm)
+ {
+ this.highWaterMark_ = hwm;
+ this.lowWaterMark_ = lwm;
+ this.deactivated_ = false;
+ this.currentBytes_ = 0;
+ this.currentCount_ = 0;
+ this.tail_ = null;
+ this.head_ = null;
+ return 0;
+ }
+
+ // ************ Note! ***********
+ // = For enqueue, enqueueHead, enqueueTail, and dequeueHead if
+ // timeout is specified, the caller will wait until the *absolute time*
+ // tv. Calls will return, however, when queue is closed,
+ // deactivated, or if it is past the time tv
+
+ /**
+ * Enqueue a <MessageBlock> into the <MessageQueue> in accordance
+ * with its <msgPriority> (0 is lowest priority). Note that the
+ * call will block (unless the queue has been deactivated).
+ *
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ *@param newItem item to enqueue onto the Message Queue
+ *@return -1 on failure, else the number of items still on the queue.
+ */
+ public synchronized int enqueue (MessageBlock newItem) throws InterruptedException
+ {
+ return this.enqueue (newItem, null);
+ }
+
+ /**
+ * Enqueue a <MessageBlock> into the <MessageQueue> in accordance
+ * with its <msgPriority> (0 is lowest priority). Note that the
+ * call will return if the queue has been deactivated or it is
+ * later than the specified absolute time value.
+ *@param newItem item to enqueue onto the Message Queue
+ *@param tv absolute TimeValue to timeout after
+ *@return -1 on failure, else the number of items still on the
+ * queue.
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ public synchronized int enqueue (MessageBlock newItem,
+ TimeValue tv) throws InterruptedException
+ {
+ int result = -1;
+ if (this.deactivated_)
+ return -1;
+ try
+ {
+ if (tv == null) // Need to do a blocking wait
+ notFullCondition_.timedWait ();
+ else // Need to do a timed wait
+ notFullCondition_.timedWait (tv);
+ }
+ catch (TimeoutException e)
+ {
+ return -1;
+ }
+
+ // Check again if queue is still active
+ if (this.deactivated_)
+ return -1;
+ else
+ result = this.enqueueInternal (newItem);
+
+ // Tell any blocked threads that the queue has a new item!
+ this.notEmptyCondition_.broadcast ();
+ return result;
+ }
+
+ /**
+ * Enqueue a <MessageBlock> at the end of the <MessageQueue>. Note
+ * that the call will block (unless the queue has been deactivated).
+ *@param newItem item to enqueue onto the Message Queue
+ *@return -1 on failure, else the number of items still on the queue.
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ public synchronized int enqueueTail (MessageBlock newItem) throws InterruptedException
+ {
+ return this.enqueueTail (newItem, null);
+ }
+
+ /**
+ * Enqueue a <MessageBlock> at the end of the <MessageQueue>. Note
+ * that the call will return when it's later than the given TimeValue or
+ * if the queue has been deactivated.
+ *@param newItem item to enqueue onto the Message Queue
+ *@param tv absolute TimeValue to wait until before returning (unless
+ * the operation compeltes before this time)
+ *@return -1 on failure, else the number of items still on the queue.
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ public synchronized int enqueueTail (MessageBlock newItem,
+ TimeValue tv) throws InterruptedException
+ {
+ int result = -1;
+ if (this.deactivated_)
+ return -1;
+ try
+ {
+ if (tv == null) // Need to do a blocking wait
+ notFullCondition_.timedWait ();
+ else // Need to do a timed wait
+ notFullCondition_.timedWait (tv);
+ }
+ catch (TimeoutException e)
+ {
+ return -1;
+ }
+
+ // Check again if queue is still active
+ if (this.deactivated_)
+ return -1;
+ else
+ result = this.enqueueTailInternal (newItem);
+
+ // Tell any blocked threads that the queue has a new item!
+ this.notEmptyCondition_.broadcast ();
+ return result;
+ }
+
+ /**
+ * Enqueue a <MessageBlock> at the head of the <MessageQueue>. Note
+ * that the call will block (unless the queue has been deactivated).
+ *@param newItem item to enqueue onto the Message Queue
+ *@return -1 on failure, else the number of items still on the queue.
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ public synchronized int enqueueHead (MessageBlock newItem) throws InterruptedException
+ {
+ return this.enqueueHead (newItem, null);
+ }
+
+ /**
+ * Enqueue a <MessageBlock> at the head of the <MessageQueue>. Note
+ * that the call will return when it's later than the given TimeValue or
+ * if the queue has been deactivated.
+ *@param newItem item to enqueue onto the Message Queue
+ *@param tv absolute TimeValue to wait until before returning (unless
+ * the operation completes before that time)
+ *@return -1 on failure, else the number of items still on the queue.
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ public synchronized int enqueueHead (MessageBlock newItem,
+ TimeValue tv) throws InterruptedException
+ {
+ int result = -1;
+ if (this.deactivated_)
+ return -1;
+ try
+ {
+ if (tv == null) // Need to do a blocking wait
+ notFullCondition_.timedWait ();
+ else // Need to do a timed wait
+ notFullCondition_.timedWait (tv);
+ }
+ catch (TimeoutException e)
+ {
+ return -1;
+ }
+
+ // Check again if queue is still active
+ if (this.deactivated_)
+ return -1;
+ else
+ result = this.enqueueHeadInternal (newItem);
+
+ // Tell any blocked threads that the queue has a new item!
+ this.notEmptyCondition_.broadcast ();
+ return result;
+ }
+
+ /**
+ * Dequeue and return the <MessageBlock> at the head of the
+ * <MessageQueue>. Note that the call will block (unless the queue
+ * has been deactivated).
+ *@return null on failure, else the <MessageBlock> at the head of queue.
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ public synchronized MessageBlock dequeueHead () throws InterruptedException
+ {
+ return this.dequeueHead (null);
+ }
+
+ /**
+ * Dequeue and return the <MessageBlock> at the head of the
+ * <MessageQueue>. Note that the call when return if the queue has
+ * been deactivated or when the current time is later than the given
+ * time value.
+ *@param tv absolute time timeout (blocks indefinitely if null)
+ *@return null on failure, else the <MessageBlock> at the head of queue.
+ *@exception InterruptedException Interrupted while accessing queue
+ */
+ public synchronized MessageBlock dequeueHead (TimeValue tv)
+ throws InterruptedException
+ {
+ MessageBlock result = null;
+ if (this.deactivated_)
+ return null;
+ try
+ {
+ if (tv == null) // Need to do a blocking wait
+ notEmptyCondition_.timedWait ();
+ else // Need to do a timed wait
+ notEmptyCondition_.timedWait (tv);
+ }
+ catch (TimeoutException e)
+ {
+ return null;
+ }
+
+ // Check again if queue is still active
+ if (this.deactivated_)
+ return null;
+ else
+ result = this.dequeueHeadInternal ();
+
+ // Tell any blocked threads that the queue has room for an item!
+ this.notFullCondition_.broadcast ();
+ return result;
+ }
+
+ /**
+ * Check if queue is full.
+ *@return true if queue is full, else false.
+ */
+ public synchronized boolean isFull ()
+ {
+ return this.isFullInternal ();
+ }
+
+ /**
+ * Check if queue is empty.
+ *@return true if queue is empty, else false.
+ */
+ public synchronized boolean isEmpty ()
+ {
+ return this.isEmptyInternal ();
+ }
+
+ /**
+ * Get total number of bytes on the queue.
+ *@return total number number of bytes on the queue
+ */
+ public int messageBytes ()
+ {
+ return this.currentBytes_;
+ }
+
+ /**
+ * Get total number of messages on the queue.
+ *@return total number number of messages on the queue
+ */
+ public int messageCount ()
+ {
+ return this.currentCount_;
+ }
+
+ // = Flow control routines
+
+ /**
+ * Get high watermark.
+ *@return high watermark
+ */
+ public int highWaterMark ()
+ {
+ return this.highWaterMark_;
+ }
+
+ /**
+ * Set high watermark.
+ *@param hwm high watermark
+ */
+ public void highWaterMark (int hwm)
+ {
+ this.highWaterMark_ = hwm;
+ }
+
+ /**
+ * Get low watermark.
+ *@return low watermark
+ */
+ public int lowWaterMark ()
+ {
+ return this.lowWaterMark_;
+ }
+
+ /**
+ * Set low watermark.
+ *@param lwm low watermark
+ */
+ public void lowWaterMark (int lwm)
+ {
+ this.lowWaterMark_ = lwm;
+ }
+
+ // = Activation control methods.
+
+ /**
+ * Deactivate the queue and wakeup all threads waiting on the queue
+ * so they can continue. No messages are removed from the queue,
+ * however. Any other operations called until the queue is
+ * activated again will immediately return -1.
+ *@return WAS_INACTIVE if queue was inactive before the call and
+ * WAS_ACTIVE if queue was active before the call.
+ */
+ public synchronized int deactivate ()
+ {
+ return this.deactivateInternal ();
+ }
+
+
+ /**
+ * Reactivate the queue so that threads can enqueue and dequeue
+ * messages again.
+ *@return WAS_INACTIVE if queue was inactive before the call and
+ * WAS_ACTIVE if queue was active before the call.
+ */
+ public synchronized int activate ()
+ {
+ return this.activateInternal ();
+ }
+
+ protected boolean isEmptyInternal ()
+ {
+ // Not sure about this one!!!!
+ return this.currentBytes_ <= this.lowWaterMark_ && this.currentCount_ <= 0;
+ }
+
+ protected boolean isFullInternal ()
+ {
+ return this.currentBytes_ > this.highWaterMark_;
+ }
+
+ protected int deactivateInternal ()
+ {
+ int currentStatus =
+ this.deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
+
+ this.notFullCondition_.broadcast ();
+ this.notEmptyCondition_.broadcast ();
+
+ this.deactivated_ = true;
+ return currentStatus;
+ }
+
+ protected int activateInternal ()
+ {
+ int currentStatus =
+ this.deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
+ this.deactivated_ = false;
+
+ return currentStatus;
+ }
+
+ protected int enqueueTailInternal (MessageBlock newItem)
+ {
+ if (newItem == null)
+ return -1;
+
+ // List was empty, so build a new one.
+ if (this.tail_ == null)
+ {
+ this.head_ = newItem;
+ this.tail_ = newItem;
+ newItem.next (null);
+ newItem.prev (null);
+ }
+ // Link at the end.
+ else
+ {
+ newItem.next (null);
+ this.tail_.next (newItem);
+ newItem.prev (this.tail_);
+ this.tail_ = newItem;
+ }
+
+ if (newItem.msgType() != MessageType.MB_OBJECT)
+ {
+ // Make sure to count *all* the bytes in a composite message!!!
+ for (MessageBlock temp = newItem;
+ temp != null;
+ temp = temp.cont ())
+ this.currentBytes_ += temp.size ();
+ }
+
+ this.currentCount_++;
+ return this.currentCount_;
+ }
+
+ protected int enqueueHeadInternal (MessageBlock newItem)
+ {
+ if (newItem == null)
+ return -1;
+
+ newItem.prev (null);
+ newItem.next (this.head_);
+
+ if (this.head_ != null)
+ this.head_.prev (newItem);
+ else
+ this.tail_ = newItem;
+
+ this.head_ = newItem;
+
+ if (newItem.msgType() != MessageType.MB_OBJECT)
+ {
+ // Make sure to count *all* the bytes in a composite message!!!
+ for (MessageBlock temp = newItem;
+ temp != null;
+ temp = temp.cont ())
+ this.currentBytes_ += temp.size ();
+ }
+
+ this.currentCount_++;
+
+ return this.currentCount_;
+ }
+
+ protected int enqueueInternal (MessageBlock newItem)
+ {
+ if (newItem == null)
+ return -1;
+
+ if (this.head_ == null)
+ // Check for simple case of an empty queue, where all we need to
+ // do is insert <newItem> into the head.
+ return this.enqueueHeadInternal (newItem);
+ else
+ {
+ MessageBlock temp;
+
+ // Figure out where the new item goes relative to its priority.
+
+ for (temp = this.head_;
+ temp != null;
+ temp = temp.next ())
+ {
+ if (temp.msgPriority () <= newItem.msgPriority ())
+ // Break out when we've located an item that has lower
+ // priority that <newItem>.
+ break;
+ }
+
+ if (temp == null)
+ // Check for simple case of inserting at the end of the queue,
+ // where all we need to do is insert <newItem> after the
+ // current tail.
+ return this.enqueueTailInternal (newItem);
+ else if (temp.prev () == null)
+ // Check for simple case of inserting at the beginning of the
+ // queue, where all we need to do is insert <newItem> before
+ // the current head.
+ return this.enqueueHeadInternal (newItem);
+ else
+ {
+ // Insert the message right before the item of equal or lower
+ // priority.
+ newItem.next (temp);
+ newItem.prev (temp.prev ());
+ temp.prev ().next (newItem);
+ temp.prev (newItem);
+ }
+ }
+
+ if (newItem.msgType() != MessageType.MB_OBJECT)
+ {
+ // Make sure to count *all* the bytes in a composite message!!!
+ for (MessageBlock temp = newItem;
+ temp != null;
+ temp = temp.cont ())
+ this.currentBytes_ += temp.size ();
+ }
+
+ this.currentCount_++;
+ return this.currentCount_;
+ }
+
+ protected MessageBlock dequeueHeadInternal ()
+ {
+ MessageBlock firstItem = this.head_;
+ this.head_ = this.head_.next ();
+
+ if (this.head_ == null)
+ this.tail_ = null;
+
+ if (firstItem.msgType() != MessageType.MB_OBJECT)
+ {
+ // Make sure to subtract off all of the bytes associated with this
+ // message.
+ for (MessageBlock temp = firstItem;
+ temp != null;
+ temp = temp.cont ())
+ this.currentBytes_ -= temp.size ();
+ }
+
+ this.currentCount_--;
+ return firstItem;
+ }
+
+
+ /** Default high watermark (16 K). */
+ public final static int DEFAULT_HWM = 16 * 1024;
+
+ /** Default low watermark. */
+ public final static int DEFAULT_LWM = 0;
+
+ /** Message queue was active before activate() or deactivate(). */
+ public final static int WAS_ACTIVE = 1;
+
+ /** Message queue was inactive before activate() or deactivate(). */
+ public final static int WAS_INACTIVE = 2;
+
+ private int highWaterMark_;
+ // Greatest number of bytes before blocking.
+
+ private int lowWaterMark_;
+ // Lowest number of bytes before unblocking occurs.
+
+ private boolean deactivated_;
+ // Indicates that the queue is inactive.
+
+ private int currentBytes_;
+ // Current number of bytes in the queue.
+
+ private int currentCount_;
+ // Current number of messages in the queue.
+
+ private MessageBlock head_;
+ // Head of Message_Block list.
+
+ private MessageBlock tail_;
+ // Tail of Message_Block list.
+
+ // The Delegated Notification mechanisms.
+ private NotFullCondition notFullCondition_ = new NotFullCondition (this);
+ private NotEmptyCondition notEmptyCondition_ = new NotEmptyCondition (this);
+
+}
diff --git a/java/JACE/ASX/MessageType.java b/java/JACE/ASX/MessageType.java
new file mode 100644
index 00000000000..97e33a6c6ba
--- /dev/null
+++ b/java/JACE/ASX/MessageType.java
@@ -0,0 +1,102 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * MessageType.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+/**
+ * Message types used by MessageBlock. <P>
+ *
+ * Defines bit masks used to identify various types of messages.<P>
+ *
+ * This class is not intended to be instantiable.
+ *
+ *@see MessageBlock
+ */
+public class MessageType
+{
+ // = Data and protocol messages (regular and priority)
+ /** regular data */
+ public static final int MB_DATA = 0x01;
+
+ /** protocol control */
+ public static final int MB_PROTO = 0x02;
+
+ /** regular data */
+ public static final int MB_OBJECT = 0x09;
+
+
+ // = Control messages (regular and priority)
+ /** line break */
+ public static final int MB_BREAK = 0x03;
+
+ /** pass file pointer */
+ public static final int MB_PASSFP = 0x04;
+
+ /** post an event to an event queue */
+ public static final int MB_EVENT = 0x05;
+
+ /** generate process signal */
+ public static final int MB_SIG = 0x06;
+
+ /** ioctl; set/get params */
+ public static final int MB_IOCTL = 0x07;
+
+ /** set various stream head options */
+ public static final int MB_SETOPTS = 0x08;
+
+
+ // = Control messages (high priority; go to head of queue)
+ /** acknowledge ioctl */
+ public static final int MB_IOCACK = 0x81;
+
+ /** negative ioctl acknowledge */
+ public static final int MB_IOCNAK = 0x82;
+
+ /** priority proto message */
+ public static final int MB_PCPROTO = 0x83;
+
+ /** generate process signal */
+ public static final int MB_PCSIG = 0x84;
+
+ /** generate read notification */
+ public static final int MB_READ = 0x85;
+
+ /** flush your queues */
+ public static final int MB_FLUSH = 0x86;
+
+ /** stop transmission immediately */
+ public static final int MB_STOP = 0x87;
+
+ /** restart transmission after stop */
+ public static final int MB_START = 0x88;
+
+ /** line disconnect */
+ public static final int MB_HANGUP = 0x89;
+
+ /** fatal error used to set u.u_error */
+ public static final int MB_ERROR = 0x8a;
+
+ /** post an event to an event queue */
+ public static final int MB_PCEVENT = 0x8b;
+
+
+ /** Normal priority messages */
+ public static final int MB_NORMAL = 0x00;
+
+ /** High priority control messages */
+ public static final int MB_PRIORITY = 0x80;
+
+ // Default private constructor to avoid instantiation
+ private MessageType ()
+ {
+ }
+}
+
diff --git a/java/JACE/ASX/Module.java b/java/JACE/ASX/Module.java
new file mode 100644
index 00000000000..b6a3468cfd1
--- /dev/null
+++ b/java/JACE/ASX/Module.java
@@ -0,0 +1,246 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * Module.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+import JACE.OS.*;
+
+/**
+ * Provides an abstraction for managing a bi-directional flow of
+ * messages. <P>
+ *
+ * This is based on the Module concept in System V Streams,
+ * which contains a pair of Tasks, one for handling upstream
+ * processing, one for handling downstream processing.
+ */
+public class Module
+{
+ // = Initialization and termination methods.
+
+ /**
+ * Create an empty Module.
+ */
+ public Module ()
+ {
+ // Do nothing...
+ this.name ("<unknown>");
+ }
+
+ /*
+ * Create an initialized module.
+ *@param modName identity of the module.
+ *@param writerQ writer task of the module.
+ *@param readerQ reader task of the module.
+ *@param flags Module flags
+ */
+ public Module (String modName,
+ Task writerQ,
+ Task readerQ,
+ Object flags)
+ {
+ this.open (modName, writerQ, readerQ, flags);
+ }
+
+ /*
+ * Create an initialized module.
+ *@param modName identity of the module.
+ *@param writerQ writer task of the module.
+ *@param readerQ reader task of the module.
+ *@param flags Module flags
+ */
+ public void open (String modName,
+ Task writerQ,
+ Task readerQ,
+ Object arg)
+ {
+ this.name (modName);
+ this.arg_ = arg;
+
+ if (writerQ == null)
+ writerQ = new ThruTask ();
+ if (readerQ == null)
+ readerQ = new ThruTask ();
+
+ this.reader (readerQ);
+ this.writer (writerQ);
+
+ // Setup back pointers.
+ readerQ.module (this);
+ writerQ.module (this);
+ }
+
+
+ /*
+ * Set the writer task.
+ *@param q the writer task
+ */
+ public void writer (Task q)
+ {
+ this.qPair_[1] = q;
+ if (q != null)
+ q.flags (ACE.CLR_BITS (q.flags (), TaskFlags.ACE_READER));
+ }
+
+ /*
+ * Set the reader task.
+ *@param q the reader task
+ */
+ public void reader (Task q)
+ {
+ this.qPair_[0] = q;
+ if (q != null)
+ q.flags (ACE.SET_BITS (q.flags (), TaskFlags.ACE_READER));
+ }
+
+ /*
+ * Link this Module on top of Module.
+ *@param m the module to link this on top of.
+ */
+ public void link (Module m)
+ {
+ this.next (m);
+ this.writer ().next (m.writer ());
+ m.reader ().next (this.reader ());
+ }
+
+ /*
+ * Set and get pointer to sibling Task in Module.
+ *@param orig the task to get the sibling for
+ *@return the sibling of the task
+ */
+ public Task sibling (Task orig)
+ {
+ if (this.qPair_[0] == orig)
+ return this.qPair_[1];
+ else if (this.qPair_[1] == orig)
+ return this.qPair_[0];
+ else
+ return null;
+ }
+
+ /*
+ * Close down the module and its tasks.
+ *@param flags Module flags
+ *@return 0 on success, -1 on failure
+ */
+ public int close (long flags)
+ {
+ Task readerQ = this.reader ();
+ Task writerQ = this.writer ();
+ int result = 0;
+
+ if (readerQ != null)
+ {
+ if (readerQ.close (flags) == -1)
+ result = -1;
+ readerQ.flush (flags);
+ readerQ.next (null);
+ }
+
+ if (writerQ != null)
+ {
+ if (writerQ.close (flags) == -1)
+ result = -1;
+ writerQ.flush (flags);
+ writerQ.next (null);
+ }
+
+ return result;
+ }
+
+ /*
+ * Get the argument passed to tasks.
+ *@return the argument passed to tasks.
+ */
+ public Object arg ()
+ {
+ return this.arg_;
+ }
+
+ /*
+ * Set the argument to be passed to tasks.
+ *@param a the argument to be passed to tasks.
+ */
+ public void arg (Object a)
+ {
+ this.arg_ = a;
+ }
+
+ /*
+ * Get the name of the module.
+ *@return the name of the module.
+ */
+ public String name ()
+ {
+ return this.name_;
+ }
+
+ /*
+ * Set the name of the module.
+ *@param n the name of the module.
+ */
+ public void name (String n)
+ {
+ this.name_ = n;
+ }
+
+ /*
+ * Get the writer task of the module.
+ *@return the writer task of the module.
+ */
+ public Task writer ()
+ {
+ return this.qPair_[1];
+ }
+
+ /*
+ * Get the reader task of the module.
+ *@return the reader task of the module.
+ */
+ public Task reader ()
+ {
+ return this.qPair_[0];
+ }
+
+ /*
+ * Get the next pointer to the module above in the stream.
+ *@return the next pointer to the module above in the stream.
+ */
+ public Module next ()
+ {
+ return this.next_;
+ }
+
+ /*
+ * Set the next pointer to the module above in the stream.
+ *@param m the next pointer to the module above in the stream.
+ */
+ public void next (Module m)
+ {
+ this.next_ = m;
+ }
+
+ private Task qPair [] = new Task[2];
+ // Pair of Tasks that form the "read-side" and "write-side" of the
+ // ACE_Module partitioning.
+
+ private String name_ = null;
+ // Name of the ACE_Module.
+
+ private Module next_;
+ // Next ACE_Module in the stack.
+
+ private Object arg_;
+ // Argument passed through to the reader and writer task when they
+ // are opened.
+
+}
+
diff --git a/java/JACE/ASX/Stream.java b/java/JACE/ASX/Stream.java
new file mode 100644
index 00000000000..6a968714ab7
--- /dev/null
+++ b/java/JACE/ASX/Stream.java
@@ -0,0 +1,436 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * Stream.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+import JACE.OS.*;
+
+/**
+ * This class is the primary abstraction for the ASX framework.
+ * It is moduled after System V Stream. <P>
+ *
+ * A Stream consists of a stack of Modules, each of which
+ * contains two Tasks.
+ *
+ *@see Module
+ *@see Task
+ */
+
+public class Stream
+{
+
+ public Stream ()
+ {
+ this (null, null, null);
+ }
+
+ // Create a Stream consisting of <head> and <tail> as the Stream
+ // head and Stream tail, respectively. If these are 0 then the
+ // <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively.
+ // <arg> is the value past in to the open() methods of the tasks.
+
+ public Stream (Object a,
+ Module head,
+ Module tail)
+ {
+ this.linkedUs_ = null;
+ // this.final_close_ = this.lock_;
+
+ if (this.open (a, head, tail) == -1)
+ ACE.ERROR ("open" + head.name () + " " + tail.name ());
+ }
+
+ public int push (Module newTop)
+ {
+ if (this.pushModule (newTop,
+ this.streamHead_.next (),
+ this.streamHead_) == -1)
+ return -1;
+ else
+ return 0;
+ }
+
+ // Note that the timeout tv is absolute time
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ return this.streamHead_.writer ().put (mb, tv);
+ }
+
+ // Note that the timeout tv is absolute time
+ public MessageBlock get (TimeValue tv) throws InterruptedException
+ {
+ return this.streamHead_.reader ().getq (tv);
+ }
+
+// Return the "top" ACE_Module in a ACE_Stream, skipping over the
+// stream_head.
+
+ public Module top ()
+ {
+ if (this.streamHead_.next () == this.streamTail_)
+ return null;
+ else
+ return this.streamHead_.next ();
+ }
+
+// Remove the "top" ACE_Module in a ACE_Stream, skipping over the
+// stream_head.
+
+ public int pop (long flags)
+ {
+ if (this.streamHead_.next () == this.streamTail_)
+ return -1;
+ else
+ {
+ // Skip over the ACE_Stream head.
+ Module top = this.streamHead_.next ();
+ Module newTop = top.next ();
+
+ this.streamHead_.next (newTop);
+
+ // Close the top ACE_Module.
+
+ top.close (flags);
+
+ this.streamHead_.writer ().next (newTop.writer ());
+ newTop.reader ().next (this.streamHead_.reader ());
+
+ return 0;
+ }
+ }
+
+// Remove a named ACE_Module from an arbitrary place in the
+// ACE_Stream.
+
+ public int remove (String name, long flags)
+ {
+ Module prev = null;
+
+ for (Module mod = this.streamHead_;
+ mod != null; mod = mod.next ())
+ if (name.compareTo (mod.name ()) == 0)
+ {
+ if (prev == null) // Deleting ACE_Stream Head
+ this.streamHead_.link (mod.next ());
+ else
+ prev.link (mod.next ());
+
+ mod.close (flags);
+ return 0;
+ }
+ else
+ prev = mod;
+
+ return -1;
+ }
+
+ public Module find (String name)
+ {
+ for (Module mod = this.streamHead_;
+ mod != null;
+ mod = mod.next ())
+ if (name.compareTo (mod.name ()) == 0)
+ return mod;
+
+ return null;
+ }
+
+// Actually push a module onto the stack...
+
+ private int pushModule (Module newTop,
+ Module currentTop,
+ Module head)
+ {
+ Task ntReader = newTop.reader ();
+ Task ntWriter = newTop.writer ();
+ Task ctReader = null;
+ Task ctWriter = null;
+
+ if (currentTop != null)
+ {
+ ctReader = currentTop.reader ();
+ ctWriter = currentTop.writer ();
+ ctReader.next (ntReader);
+ }
+
+ ntWriter.next (ctWriter);
+
+ if (head != null)
+ {
+ if (head != newTop)
+ head.link (newTop);
+ }
+ else
+ ntReader.next (null);
+
+ newTop.next (currentTop);
+
+ if (ntReader.open (newTop.arg ()) == -1)
+ return -1;
+
+ if (ntWriter.open (newTop.arg ()) == -1)
+ return -1;
+ return 0;
+ }
+
+ public synchronized int open (Object a,
+ Module head,
+ Module tail)
+ {
+ Task h1 = null, h2 = null;
+ Task t1 = null, t2 = null;
+
+ if (head == null)
+ {
+ h1 = new StreamHead ();
+ h2 = new StreamHead ();
+ head = new Module ("ACEStreamHead", h1, h2, a);
+ }
+
+ if (tail == null)
+ {
+ t1 = new StreamTail ();
+ t2 = new StreamTail ();
+ tail = new Module ("ACEStreamTail",
+ t1, t2, a);
+ }
+
+ // Make sure *all* the allocation succeeded!
+ if (h1 == null || h2 == null || head == null
+ || t1 == null || t2 == null || tail == null)
+ {
+ // Close up!
+ head.close (0);
+ tail.close (0);
+ return -1;
+ }
+
+ this.streamHead_ = head;
+ this.streamTail_ = tail;
+
+ if (this.pushModule (this.streamTail_,
+ null, null) == -1)
+ return -1;
+ else if (this.pushModule (this.streamHead_,
+ this.streamTail_,
+ this.streamHead_) == -1)
+ return -1;
+ else
+ return 0;
+ }
+
+ public synchronized int close (long flags)
+ {
+ if (this.streamHead_ != null
+ && this.streamTail_ != null)
+ {
+ // Don't bother checking return value here.
+ this.unlinkInternal ();
+
+ int result = 0;
+
+ // Remove and cleanup all the intermediate modules.
+
+ while (this.streamHead_.next () != this.streamTail_)
+ {
+ if (this.pop (flags) == -1)
+ result = -1;
+ }
+
+ // Clean up the head and tail of the stream.
+ if (this.streamHead_.close (flags) == -1)
+ result = -1;
+ if (this.streamTail_.close (flags) == -1)
+ result = -1;
+
+ this.streamHead_ = null;
+ this.streamTail_ = null;
+
+ // Tell all threads waiting on the close that we are done.
+ // this.final_close_.broadcast ();
+ return result;
+ }
+ return 0;
+ }
+
+ public int control (int cmd, Object a) throws InterruptedException
+ {
+ IOCntlMsg ioc = new IOCntlMsg (cmd);
+
+ // Create a data block that contains the user-supplied data.
+ MessageBlock db =
+ new MessageBlock (MessageType.MB_IOCTL,
+ null,
+ a);
+
+ // Create a control block that contains the control field and a
+ // pointer to the data block.
+ MessageBlock cb =
+ new MessageBlock (MessageType.MB_IOCTL,
+ db,
+ (Object) ioc);
+
+ int result = 0;
+
+ if (this.streamHead_.writer ().put (cb, null) == -1)
+ result = -1;
+ else if ((cb = this.streamHead_.reader ().getq (null)) == null)
+ result = -1;
+ else
+ result = ((IOCntlMsg ) cb.obj ()).rval ();
+
+ return result;
+ }
+
+// Link two streams together at their bottom-most Modules (i.e., the
+// one just above the Stream tail). Note that all of this is premised
+// on the fact that the Stream head and Stream tail are non-NULL...
+// This must be called with locks held.
+
+ private int linkInternal (Stream us)
+ {
+ this.linkedUs_ = us;
+ // Make sure the other side is also linked to us!
+ us.linkedUs_ = this;
+
+ Module myTail = this.streamHead_;
+
+ if (myTail == null)
+ return -1;
+
+ // Locate the module just above our Stream tail.
+ while (myTail.next () != this.streamTail_)
+ myTail = myTail.next ();
+
+ Module otherTail = us.streamHead_;
+
+ if (otherTail == null)
+ return -1;
+
+ // Locate the module just above the other Stream's tail.
+ while (otherTail.next () != us.streamTail_)
+ otherTail = otherTail.next ();
+
+ // Reattach the pointers so that the two streams are linked!
+ myTail.writer ().next (otherTail.reader ());
+ otherTail.writer ().next (myTail.reader ());
+ return 0;
+ }
+
+ public synchronized int link (Stream us)
+ {
+ return this.linkInternal (us);
+ }
+
+// Must be called with locks held...
+
+ private int unlinkInternal ()
+ {
+ // Only try to unlink if we are in fact still linked!
+
+ if (this.linkedUs_ != null)
+ {
+ Module myTail = this.streamHead_;
+
+ // Only relink if we still exist!
+ if (myTail != null)
+ {
+ // Find the module that's just before our stream tail.
+ while (myTail.next () != this.streamTail_)
+ myTail = myTail.next ();
+
+ // Restore the writer's next() link to our tail.
+ myTail.writer ().next (this.streamTail_.writer ());
+ }
+
+ Module otherTail = this.linkedUs_.streamHead_;
+
+ // Only fiddle with the other side if it in fact still remains.
+ if (otherTail != null)
+ {
+ while (otherTail.next () != this.linkedUs_.streamTail_)
+ otherTail = otherTail.next ();
+
+ otherTail.writer ().next (this.linkedUs_.streamTail_.writer ());
+
+ }
+
+ // Make sure the other side is also aware that it's been unlinked!
+ this.linkedUs_.linkedUs_ = null;
+
+ this.linkedUs_ = null;
+ return 0;
+ }
+ else
+ return -1;
+ }
+
+ public synchronized int unlink ()
+ {
+ return this.unlinkInternal ();
+ }
+
+ public void dump ()
+ {
+ ACE.DEBUG ("-------- module links --------");
+
+ for (Module mp = this.streamHead_; ; mp = mp.next ())
+ {
+ ACE.DEBUG ("module name = " + mp.name ());
+ if (mp == this.streamTail_)
+ break;
+ }
+
+ ACE.DEBUG ("-------- writer links --------");
+
+ Task tp;
+
+ for (tp = this.streamHead_.writer (); ; tp = tp.next ())
+ {
+ ACE.DEBUG ("writer queue name = " + tp.name ());
+ tp.dump ();
+ ACE.DEBUG ("-------\n");
+ if (tp == this.streamTail_.writer ()
+ || (this.linkedUs_ != null && tp == this.linkedUs_.streamHead_.reader ()))
+ break;
+ }
+
+ ACE.DEBUG ("-------- reader links --------\n");
+ for (tp = this.streamTail_.reader (); ; tp = tp.next ())
+ {
+ ACE.DEBUG ("reader queue name = " + tp.name ());
+ tp.dump ();
+ ACE.DEBUG ("-------\n");
+ if (tp == this.streamHead_.reader ()
+ || (this.linkedUs_ != null && tp == this.linkedUs_.streamHead_.writer ()))
+ break;
+ }
+ }
+
+ Module streamHead_ = null;
+ // Pointer to the head of the stream.
+
+ Module streamTail_ = null;
+ // Pointer to the tail of the stream.
+
+ Stream linkedUs_ = null;
+ // Pointer to an adjoining linked stream.
+
+ // = Synchronization objects used for thread-safe streams.
+ // ACE_SYNCH_MUTEX lock_;
+ // Protect the stream against race conditions.
+
+ // ACE_SYNCH_CONDITION final_close_;
+ // Use to tell all threads waiting on the close that we are done.
+
+}
+
+
diff --git a/java/JACE/ASX/StreamHead.java b/java/JACE/ASX/StreamHead.java
new file mode 100644
index 00000000000..1492b43a297
--- /dev/null
+++ b/java/JACE/ASX/StreamHead.java
@@ -0,0 +1,123 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * StreamHead.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+import JACE.OS.*;
+
+/**
+ * Standard module that acts as the head of a ustream.
+ */
+
+public class StreamHead extends Task
+{
+ // Module that acts as the head of a Stream.
+
+ public int open (Object obj)
+ {
+ return 0;
+ }
+
+ public int close (long l)
+ {
+ return 0;
+ }
+
+ public int svc ()
+ {
+ return -1;
+ }
+
+ private int control (MessageBlock mb)
+ {
+
+ IOCntlMsg ioc = (IOCntlMsg) mb.obj ();
+ int cmd = ioc.cmd ();
+
+ switch (cmd)
+ {
+ case IOCntlCmds.SET_LWM:
+ case IOCntlCmds.SET_HWM:
+ this.waterMarks (cmd, mb.cont ().length ());
+ ioc.rval (0);
+ break;
+ default:
+ return 0;
+ }
+ return ioc.rval ();
+ }
+
+ /* Performs canonical flushing at the ACE_Stream Head */
+
+ private int canonicalFlush (MessageBlock mb)
+ {
+ String s = mb.base ();
+ long f = (new Long (s)).longValue ();
+
+ if ((f & TaskFlags.ACE_FLUSHR) != 0)
+ {
+ this.flush (TaskFlags.ACE_FLUSHALL);
+ f &= ~TaskFlags.ACE_FLUSHR;
+ }
+ if ((f & TaskFlags.ACE_FLUSHW) != 0)
+ return this.reply (mb, null);
+ return 0;
+ }
+
+ // Will block forever to add the given MessageBlock
+ public int put (MessageBlock mb)
+ {
+ return this.put (mb, null);
+ }
+
+ // tv is absolute time
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ int res = 0;
+ if (mb.msgType () == MessageType.MB_IOCTL
+ && (res = this.control (mb)) == -1)
+ return res;
+
+ if (this.isWriter ())
+ {
+ return this.putNext (mb, tv);
+ }
+ else /* this.isReader () */
+ {
+ switch (mb.msgType ())
+ {
+ case MessageType.MB_FLUSH:
+ return this.canonicalFlush (mb);
+ default:
+ break;
+ }
+
+ try
+ {
+ return this.putq (mb, tv);
+ }
+ catch (InterruptedException e)
+ {
+ return -1;
+ }
+ }
+ }
+
+ public void dump ()
+ {
+ }
+
+ public int handleTimeout (TimeValue tv, Object obj)
+ {
+ return 0;
+ }
+
+}
diff --git a/java/JACE/ASX/StreamTail.java b/java/JACE/ASX/StreamTail.java
new file mode 100644
index 00000000000..c1148a4c0f1
--- /dev/null
+++ b/java/JACE/ASX/StreamTail.java
@@ -0,0 +1,114 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * StreamTail.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+import JACE.OS.*;
+
+/**
+ * Standard module that acts as the tail of a ustream.
+ */
+public class StreamTail extends Task
+{
+ // Module that acts as the tail of a Stream.
+
+ public int open (Object obj)
+ {
+ return 0;
+ }
+
+ public int close (long l)
+ {
+ return 0;
+ }
+
+ public int svc ()
+ {
+ return -1;
+ }
+
+ private int control (MessageBlock mb)
+ {
+ IOCntlMsg ioc = (IOCntlMsg) mb.obj ();
+ int cmd = ioc.cmd ();
+
+ switch (cmd)
+ {
+ case IOCntlCmds.SET_LWM:
+ case IOCntlCmds.SET_HWM:
+ {
+ int size = mb.cont ().length ();
+
+ this.waterMarks (cmd, size);
+ this.sibling ().waterMarks (cmd, size);
+ ioc.rval (0);
+ break;
+ }
+ default:
+ mb.msgType (MessageType.MB_IOCNAK);
+ }
+ return this.reply (mb, null);
+ }
+
+ // Perform flush algorithm as though we were the driver
+ private int canonicalFlush (MessageBlock mb)
+ {
+ String s = mb.base ();
+ long f = (new Long (s)).longValue ();
+
+ if ((f & TaskFlags.ACE_FLUSHW) != 0)
+ {
+ this.flush (TaskFlags.ACE_FLUSHALL);
+ f &= ~TaskFlags.ACE_FLUSHW;
+ }
+ if ((f & TaskFlags.ACE_FLUSHR) != 0)
+ {
+ this.sibling ().flush (TaskFlags.ACE_FLUSHALL);
+ return this.reply (mb, null);
+ }
+ return 0;
+ }
+
+ // put the given MessageBlock without a timeout (block forever if
+ // necessary)
+ public int put (MessageBlock mb)
+ {
+ return this.put (mb, null);
+ }
+
+ // tv is an absolute time timeout
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ if (this.isWriter ())
+ {
+ switch (mb.msgType ())
+ {
+ case MessageType.MB_IOCTL:
+ return this.control (mb);
+ /* NOTREACHED */
+ default:
+ break;
+ }
+ }
+
+ return -1;
+ }
+
+ public void dump ()
+ {
+ }
+
+ public int handleTimeout (TimeValue tv, Object obj)
+ {
+ return 0;
+ }
+
+}
diff --git a/java/JACE/ASX/Task.java b/java/JACE/ASX/Task.java
new file mode 100644
index 00000000000..b13de64f16a
--- /dev/null
+++ b/java/JACE/ASX/Task.java
@@ -0,0 +1,443 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * Task.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+import JACE.OS.*;
+import JACE.Reactor.*;
+import JACE.Concurrency.*;
+
+/**
+ * Primary interface for application message processing, as well
+ * as input and output message queueing. <P>
+ *
+ * This class serves as the basis for passive and active objects
+ * in ACE.
+ *
+ *@see MessageQueue
+ *@see EventHandler
+ */
+public abstract class Task implements Runnable, EventHandler
+{
+ // = Initialization/termination methods.
+
+ /**
+ * Initialize a Task. Note, we allocate a message queue ourselves.
+ */
+ public Task ()
+ {
+ this.msgQueue_ = new MessageQueue ();
+ this.thrMgr_ = null;
+ }
+
+ /**
+ * Initialize a Task. Note, we use the message queue and thread
+ * manager supplied by the user.
+ *@param mq Message Queue to hold list of messages on the Task
+ *@param thrMgr Thread Manager that manages all the spawned threads
+ */
+ public Task (MessageQueue mq,
+ ThreadManager thrMgr)
+ {
+ this.msgQueue_ = mq;
+ this.thrMgr_ = thrMgr;
+ }
+
+ /**
+ * Not meant to be invoked by the user directly!. This needs to be
+ * in the public interface in order to get invoked by Thread
+ * class.
+ */
+ public void run ()
+ {
+ this.svc ();
+ }
+
+ // = Initialization and termination hooks (note that these *must* be
+ // defined by subclasses).
+
+ /**
+ * Hook called to open a Task.
+ *@param obj used to pass arbitrary information
+ */
+ public abstract int open (Object obj);
+
+ /**
+ * Hook called to close a Task.
+ */
+ public abstract int close (long flags);
+
+ // = Immediate and deferred processing methods, respectively.
+
+ /**
+ * Transfer a message into the queue to handle immediate
+ * processing.
+ *@param mb Message Block to handle immediately
+ *@param tv Latest time to wait until (absolute time)
+ */
+ public abstract int put (MessageBlock mb, TimeValue tv);
+
+ /**
+ * Run by a daemon thread to handle deferred processing. Note, that
+ * to do anything useful, this method should be overriden by the
+ * subclass.
+ *@return default implementation always returns 0.
+ */
+ public int svc ()
+ {
+ return 0;
+ }
+
+ /**
+ * Set the underlying Thread Manager.
+ *@param t Thread Manager to use
+ */
+ public synchronized void thrMgr (ThreadManager t)
+ {
+ this.thrMgr_ = t;
+ }
+
+ /**
+ * Get the Thread Manager.
+ *@return Underlying Thread Manager
+ */
+ public synchronized ThreadManager thrMgr ()
+ {
+ return this.thrMgr_;
+ }
+
+ // = Active object method.
+
+ /**
+ * Turn the task into an active object. That is, having <nThreads>
+ * separate threads of control that all invoke Task::svc.
+ *@param flags Task Flags
+ *@param nThreads number of threads to spawn
+ *@param forceActive whether to force creation of new threads or not
+ *@return -1 if failure occurs, 1 if Task is already an active
+ * object and <forceActive> is false (doesn't *not* create a new
+ * thread in this case), and 0 if Task was not already an active
+ * object and a thread is created successfully or thread is an active
+ * object and <forceActive> is true.
+ */
+ public synchronized int activate (long flags, int nThreads, boolean forceActive)
+ {
+ // Create a Thread Manager if we do not already have one
+ if (this.thrMgr_ == null)
+ this.thrMgr_ = new ThreadManager ();
+
+ if (this.thrCount () > 0 && forceActive == false)
+ return 1; // Already active.
+ this.flags_ = flags;
+
+ if (ACE.BIT_ENABLED (flags, TaskFlags.THR_DAEMON))
+ this.thrMgr_.spawnN (nThreads, this, true); // Spawn off all threads as daemon threads
+ else // Spawn off all threads as normal threads
+ this.thrMgr_.spawnN (nThreads, this, false);
+
+ return 0;
+ }
+
+ // = Suspend/resume a Task
+
+ /**
+ * Suspend a task. Default implementation is a no-op.
+ */
+ public synchronized void suspend ()
+ {
+ }
+
+ /**
+ * Resume a suspended task. Default implementation is a no-op.
+ */
+ public synchronized void resume ()
+ {
+ }
+
+ /**
+ * Get the current group name.
+ *@return name of the current thread group
+ */
+ public synchronized String grpName ()
+ {
+ if (this.thrMgr_ != null)
+ return this.thrMgr_.thrGrp ().getName ();
+ else
+ return null;
+ }
+
+ /**
+ * Get the message queue associated with this task.
+ *@return the message queue associated with this task.
+ */
+ public MessageQueue msgQueue ()
+ {
+ return this.msgQueue_;
+ }
+
+ /**
+ * Set the message queue associated with this task.
+ *@param mq Message Queue to use with this Task.
+ */
+ public void msgQueue (MessageQueue mq)
+ {
+ this.msgQueue_ = mq;
+ }
+
+ /**
+ * Get the number of threads currently running within the Task.
+ *@return the number of threads currently running within the Task.
+ * 0 if we're a passive object, else > 0.
+ */
+ public synchronized int thrCount ()
+ {
+ if (this.thrMgr_ != null)
+ return this.thrMgr_.thrGrp ().activeCount ();
+ else
+ return 0;
+ }
+
+ /**
+ * Set the Task flags
+ *@param flags Task Flags
+ */
+ public synchronized void flags (long flags)
+ {
+ this.flags_ = flags;
+ }
+
+ /**
+ * Get the Task flags
+ *@return Task Flags
+ */
+ public synchronized long flags ()
+ {
+ return this.flags_;
+ }
+
+ // = Message queue manipulation methods.
+
+
+ /*
+ * Dump debug information.
+ */
+ public void dump ()
+ {
+ }
+
+ /**
+ * Insert a message into the queue, blocking forever if necessary.
+ *@param mb Message Block to insert
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ protected int putq (MessageBlock mb) throws InterruptedException
+ {
+ return this.putq(mb, null);
+ }
+
+ /**
+ * Insert message into the message queue.
+ *@param mb Message Block to insert into the Message Queue
+ *@param tv time to wait until (absolute time)
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ protected int putq (MessageBlock mb, TimeValue tv) throws InterruptedException
+ {
+ return this.msgQueue_.enqueueTail (mb, tv);
+ }
+
+ /**
+ * Extract the first message from the queue, blocking forever if
+ * necessary.
+ *@return the first Message Block from the Message Queue.
+ *@exception InterrupteException Interrupted while accessing queue
+ */
+ protected MessageBlock getq() throws InterruptedException
+ {
+ return this.getq(null);
+ }
+
+ /**
+ * Extract the first message from the queue. Note that the call is blocking.
+ *@return the first Message Block from the Message Queue.
+ *@param tv Latest time to wait until (absolute time)
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ protected MessageBlock getq (TimeValue tv) throws InterruptedException
+ {
+ return this.msgQueue_.dequeueHead (tv);
+ }
+
+ /**
+ * Return a message back to the queue.
+ *@param mb Message Block to return back to the Message Queue
+ *@param tv Latest time to wait until (absolute time)
+ *@exception java.lang.InterruptedException Interrupted while accessing queue
+ */
+ protected int ungetq (MessageBlock mb, TimeValue tv) throws InterruptedException
+ {
+ return this.msgQueue_.enqueueHead (mb, tv);
+ }
+
+ /**
+ * Transfer message to the adjacent ACETask in an ACEStream.
+ *@param mb Message Block to transfer to the adjacent Task
+ *@param tv Latest time to wait until (absolute time)
+ *@return -1 if there is no adjacent Task, else the return value of
+ * trying to put the Message Block on that Task's Message Queue.
+ */
+ protected int putNext (MessageBlock mb, TimeValue tv)
+ {
+ return this.next_ == null ? -1 : this.next_.put (mb, tv);
+ }
+
+ /**
+ * Turn the message back around. Puts the message in the sibling's
+ * Message Queue.
+ *@param mb Message Block to put into sibling's Message Queue
+ *@param tv Latest time to wait until (absolute time)
+ *@return -1 if there is no adjacent Task to the sibling, else the
+ * return value of trying to put the Message Block on sibling's
+ * Message Queue.
+ */
+ protected int reply (MessageBlock mb, TimeValue tv)
+ {
+ return this.sibling ().putNext (mb, tv);
+ }
+
+ // = ACE_Task utility routines to identify names et al.
+
+ /**
+ * Get the name of the enclosing Module.
+ *@return the name of the enclosing Module if there's one associated
+ * with the Task, else null.
+ */
+ protected String name ()
+ {
+ if (this.mod_ == null)
+ return null;
+ else
+ return this.mod_.name ();
+ }
+
+ /**
+ * Get the Task's sibling.
+ *@return the Task's sibling if there's one associated with the
+ * Task's Module, else null.
+ */
+ protected Task sibling ()
+ {
+ if (this.mod_ == null)
+ return null;
+ else
+ return this.mod_.sibling (this);
+ }
+
+ /**
+ * Set the Task's module.
+ *@param mod the Task's Module.
+ */
+ protected void module (Module mod)
+ {
+ this.mod_ = mod;
+ }
+
+ /**
+ * Get the Task's module.
+ *@return the Task's Module if there is one, else null.
+ */
+ protected Module module ()
+ {
+ return this.mod_;
+ }
+
+ /**
+ * Check if queue is a reader.
+ *@return true if queue is a reader, else false.
+ */
+ protected boolean isReader ()
+ {
+ return (ACE.BIT_ENABLED (this.flags_, TaskFlags.ACE_READER));
+ }
+
+ /**
+ * Check if queue is a writer.
+ *@return true if queue is a writer, else false.
+ */
+ protected boolean isWriter ()
+ {
+ return (ACE.BIT_DISABLED (this.flags_, TaskFlags.ACE_READER));
+ }
+
+ // = Pointers to next ACE_Queue (if ACE is part of an ACE_Stream).
+
+ /**
+ * Get next Task pointer.
+ *@return pointer to the next Task
+ */
+ protected Task next ()
+ {
+ return this.next_;
+ }
+
+ /**
+ * Set next Task pointer.
+ *@param task next task pointer
+ */
+ protected void next (Task task)
+ {
+ this.next_ = task;
+ }
+
+ // Special routines corresponding to certain message types.
+
+ /**
+ * Flush the Message Queue
+ *@return 0 if Message Queue is null, 1 if flush succeeds, -1 if
+ * ACE_FLUSHALL bit is not enabled in flags.
+ */
+ protected int flush (long flag)
+ {
+ if (ACE.BIT_ENABLED (flag, TaskFlags.ACE_FLUSHALL))
+ return (this.msgQueue_ == null ? 0 : 1);
+ else
+ return -1;
+ }
+
+
+ /**
+ * Manipulate watermarks.
+ *@param cmd IOCntlCmd
+ *@param size watermark
+ */
+ protected void waterMarks (int cmd, int size)
+ {
+ if (cmd == IOCntlCmds.SET_LWM)
+ this.msgQueue_.lowWaterMark (size);
+ else /* cmd == IOCntlMsg.SET_HWM */
+ this.msgQueue_.highWaterMark (size);
+ }
+
+ private ThreadManager thrMgr_ = null;
+ // Thread_Manager that manages all the spawned threads
+
+ private long flags_;
+ // Task flags.
+
+ private MessageQueue msgQueue_;
+ // List of messages on the Task..
+
+ private Task next_;
+ // Adjacent ACE_Task.
+
+ private Module mod_;
+ // Back-pointer to the enclosing module.
+}
diff --git a/java/JACE/ASX/TaskFlags.java b/java/JACE/ASX/TaskFlags.java
new file mode 100644
index 00000000000..13347283adf
--- /dev/null
+++ b/java/JACE/ASX/TaskFlags.java
@@ -0,0 +1,49 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * TaskFlags.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+/**
+ * Flags used within Task.
+ *
+ *@see Task
+ */
+public abstract class TaskFlags
+{
+ /** Identifies a Task as being the "reader" in a Module. */
+ public static final int ACE_READER = 01;
+
+ /** Just flush data messages in the queue. */
+ public static final int ACE_FLUSHDATA = 02;
+
+ /** Flush all messages in the Queue. */
+ public static final int ACE_FLUSHALL = 04;
+
+ /** Flush read queue */
+ public static final int ACE_FLUSHR = 010;
+
+ /** Flush write queue */
+ public static final int ACE_FLUSHW = 020;
+
+ /** Flush both queues */
+ public static final int ACE_FLUSHRW = 030;
+
+ /** Identifies a thread as suspended */
+ public static final int THR_SUSPENDED = 0x00000080;
+
+ /** Identifies a thread as a daemon thread */
+ public static final int THR_DAEMON = 0x00000100;
+
+ // Default private constructor to avoid instantiation
+ private TaskFlags ()
+ {
+ }
+}
diff --git a/java/JACE/ASX/ThruTask.java b/java/JACE/ASX/ThruTask.java
new file mode 100644
index 00000000000..3fd0bbd4476
--- /dev/null
+++ b/java/JACE/ASX/ThruTask.java
@@ -0,0 +1,44 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * ThruTask.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+/**
+ * Standard module that acts as a "no op", simply passing on all
+ * data to its adjacent neighbor.
+ */
+public class ThruTask extends Task
+{
+ public int open (Object obj)
+ {
+ return 0;
+ }
+
+ public int close (long flags)
+ {
+ return 0;
+ }
+
+ public int put (MessageBlock msg, TimeValue tv)
+ {
+ return this.putNext (msg, tv);
+ }
+
+ public int svc ()
+ {
+ return -1;
+ }
+
+ public int handleTimeout (TimeValue tv, Object obj)
+ {
+ return 0;
+ }
+}
diff --git a/java/JACE/ASX/TimeValue.java b/java/JACE/ASX/TimeValue.java
new file mode 100644
index 00000000000..452f80447c4
--- /dev/null
+++ b/java/JACE/ASX/TimeValue.java
@@ -0,0 +1,296 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.Reactor
+ *
+ * = FILENAME
+ * TimeValue.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+//package JACE.Reactor;
+package JACE.ASX;
+
+/**
+ * Encapsulates a specific time or time interval. <P>
+ *
+ * Also provides methods for generating absolute times from
+ * relative times. This is used throughout JACE for timeouts.
+ *
+ *@see TimedWait
+ */
+public class TimeValue
+{
+ /**
+ * TimeValue representing 0 seconds and 0 nanoseconds.
+ */
+ public final static TimeValue zero = new TimeValue (0,0);
+
+ /**
+ * Default constructor. This creates a TimeValue that is
+ * equal to TimeValue.zero.
+ */
+ public TimeValue ()
+ {
+ this (0, 0);
+ }
+
+ /**
+ * Constructor
+ *@param sec seconds
+ */
+ public TimeValue (long sec)
+ {
+ this (sec, 0);
+ }
+
+ /**
+ * Constructor
+ *@param sec seconds
+ *@param nanos nanoseconds
+ */
+ public TimeValue (long sec, int nanos)
+ {
+ this.set (sec, nanos);
+ }
+
+ /**
+ * Sets the seconds and nanoseconds of Time Value
+ *@param sec seconds
+ *@param nanos nanoseconds
+ */
+ public void set (long sec, int nanos)
+ {
+ this.millisec_ = sec * 1000;
+ this.nanos_ = nanos;
+ this.normalize ();
+ }
+
+ /**
+ * Get seconds
+ *@return Seconds
+ */
+ public long sec ()
+ {
+ return this.millisec_/1000;
+ }
+
+ /**
+ * Get nanoseconds
+ *@return Nanoseconds
+ */
+ public int nanos ()
+ {
+ return this.nanos_;
+ }
+
+ /**
+ * Get time in milliseconds.
+ *@return time in milliseconds
+ */
+ public long getMilliTime ()
+ {
+ return this.millisec_;
+ }
+
+ /**
+ * Get a String representation of the Time Value.
+ *@return String representation of the Time Value
+ */
+ public String toString ()
+ {
+ return (new Long (this.millisec_/1000)).toString () + ":" +
+ (new Integer (this.nanos_)).toString ();
+ }
+
+ /**
+ * Get current time.
+ *@return the current system time as a new TimeValue
+ */
+ public static TimeValue getTimeOfDay ()
+ {
+ return new TimeValue (System.currentTimeMillis ()/1000);
+ }
+
+ /**
+ * Return a new TimeValue that represents the current system time
+ * of day offset by the given number of seconds and nanoseconds.
+ *@param sec Number of seconds to offset by
+ *@param nanos Number of nanoseconds to offset by
+ *@see JACE.ASX.TimeValue
+ *@return TimeValue for the system time plus the given offset
+ */
+ public static TimeValue relativeTimeOfDay(long sec, int nanos)
+ {
+ return new TimeValue ((System.currentTimeMillis() / 1000) + sec,
+ nanos);
+ }
+
+ /**
+ * Return a new TimeValue that represents the current system time
+ * of day offset by the given TimeValue.
+ *@param tv TimeValue to offset by
+ *@see JACE.ASX.TimeValue
+ *@return TimeValue for the system time plus the given offset
+ */
+ public static TimeValue relativeTimeOfDay(TimeValue offset)
+ {
+ return new TimeValue ((System.currentTimeMillis() / 1000) +
+ offset.sec(),
+ offset.nanos());
+ }
+
+ /**
+ * Compare two Time Values for equality.
+ *@param tv Time Value to compare with
+ *@return true if the two Time Values are equal, false otherwise
+ */
+ public boolean equals (TimeValue tv)
+ {
+ return this.millisec_ == (tv.sec () * 1000) && this.nanos_ == tv.nanos ();
+ }
+
+ /**
+ * Compare two Time Values for non-equality.
+ *@param tv Time Value to compare with
+ *@return true if the two Time Values are not equal, false otherwise
+ */
+ public boolean notEquals (TimeValue tv)
+ {
+ return !this.equals (tv);
+ }
+
+ /**
+ * Add two Time Values.
+ *@param tv1 The first Time Value
+ *@param tv2 The second Time Value
+ *@return sum of the two Time Values.
+ */
+ public static TimeValue plus (TimeValue tv1, TimeValue tv2)
+ {
+ TimeValue tv = new TimeValue (tv1.sec () + tv2.sec (),
+ tv1.nanos () + tv2.nanos ());
+ tv.normalize ();
+ return tv;
+ }
+
+ /**
+ * Subtract two Time Values.
+ *@param tv1 The first Time Value
+ *@param tv2 The second Time Value
+ *@return difference of the two Time Values.
+ */
+ public static TimeValue minus (TimeValue tv1, TimeValue tv2)
+ {
+ TimeValue tv = new TimeValue (tv1.sec () - tv2.sec (),
+ tv1.nanos () - tv2.nanos ());
+ tv.normalize ();
+ return tv;
+ }
+
+ /**
+ * Add Time Value to "this".
+ *@param tv The Time Value to add to this.
+ */
+ public void plusEquals (TimeValue tv)
+ {
+ this.set (this.sec () + tv.sec (),
+ this.nanos () + tv.nanos ());
+ this.normalize ();
+ }
+
+ /**
+ * Subtract Time Value from "this".
+ *@param tv The Time Value to subtract from this.
+ */
+ public void minusEquals (TimeValue tv)
+ {
+ this.set (this.sec () - tv.sec (),
+ this.nanos () - tv.nanos ());
+ this.normalize ();
+ }
+
+ /**
+ * Compare two Time Values for less than.
+ *@param tv Time Value to compare with
+ *@return true if "this" is less than tv, false otherwise
+ */
+ public boolean lessThan (TimeValue tv)
+ {
+ return tv.greaterThan (this);
+ }
+
+ /**
+ * Compare two Time Values for greater than.
+ *@param tv Time Value to compare with
+ *@return true if "this" is greater than tv, false otherwise
+ */
+ public boolean greaterThan (TimeValue tv)
+ {
+ if (this.sec () > tv.sec ())
+ return true;
+ else if (this.sec () == tv.sec ()
+ && this.nanos () > tv.nanos ())
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * Compare two Time Values for <=.
+ *@param tv Time Value to compare with
+ *@return true if "this" <= tv, false otherwise
+ */
+ public boolean lessThanEqual (TimeValue tv)
+ {
+ return tv.greaterThanEqual (this);
+ }
+
+ /**
+ * Compare two Time Values for >=.
+ *@param tv Time Value to compare with
+ *@return true if "this" >= tv, false otherwise
+ */
+ public boolean greaterThanEqual (TimeValue tv)
+ {
+ return this.sec () >= tv.sec () && this.nanos () >= tv.nanos ();
+ }
+
+ private void normalize ()
+ {
+ if (this.nanos_ >= ONE_MILLISECOND)
+ {
+ do
+ {
+ this.millisec_++;
+ this.nanos_ -= ONE_MILLISECOND;
+ }
+ while (this.nanos_ >= ONE_MILLISECOND);
+ }
+ else if (this.nanos_ <= -ONE_MILLISECOND)
+ {
+ do
+ {
+ this.millisec_--;
+ this.nanos_ += ONE_MILLISECOND;
+ }
+ while (this.nanos_ <= -ONE_MILLISECOND);
+ }
+
+ if (this.millisec_ >= 1 && this.nanos_ < 0)
+ {
+ this.millisec_--;
+ this.nanos_ += ONE_MILLISECOND;
+ }
+ else if (this.millisec_ < 0 && this.nanos_ > 0)
+ {
+ this.millisec_++;
+ this.nanos_ -= ONE_MILLISECOND;
+ }
+ }
+
+ private long millisec_;
+ private int nanos_;
+ private final static int ONE_MILLISECOND = 1000000;
+}
diff --git a/java/JACE/ASX/TimedWait.java b/java/JACE/ASX/TimedWait.java
new file mode 100644
index 00000000000..dc1d0bab673
--- /dev/null
+++ b/java/JACE/ASX/TimedWait.java
@@ -0,0 +1,157 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * TimedWait.java
+ *
+ *@author Prashant Jain and Doug Schmidt
+ *
+ *************************************************/
+package JACE.ASX;
+
+/**
+ * A wait/notify system with absolute time timeouts and built-in
+ * check of a condition. <P>
+ *
+ * Subclasses define the condition to check, and the object to
+ * wait on can be specified.
+ */
+public abstract class TimedWait
+{
+ /**
+ * Default Constructor. Sets "this" to be used for the delegation of
+ * the wait() call to.
+ */
+ public TimedWait ()
+ {
+ object_ = this;
+ }
+
+ /**
+ * Constructor. Allows subclasses to supply us with an Object that
+ * is delegated the wait() call.
+ *@param obj The Object that is delegated the wait() call.
+ */
+ public TimedWait (Object obj)
+ {
+ object_ = obj;
+ }
+
+ /**
+ * Hook method that needs to be implemented by subclasses.
+ */
+ public abstract boolean condition ();
+
+ /**
+ * Wait until condition becomes true. Note that the method
+ * blocks. Also note that this method is final to ensure that no one
+ * overrides it.
+ * IMPORTANT: This method assumes it is called with the object_'s
+ * monitor lock already held.
+ *@exception InterruptedException Interrupted during wait
+ */
+ public final void timedWait () throws InterruptedException
+ {
+ // Acquire the monitor lock.
+ if (!condition ())
+ {
+ // Only attempt to perform the wait if the condition isn't
+ // true initially.
+ for (;;)
+ {
+ // Wait until we are notified.
+ object_.wait ();
+
+ // Recheck the condition.
+ if (condition ())
+ break; // Condition became true.
+
+ // else we were falsely notified so go back into wait
+ }
+ }
+ }
+
+ /**
+ * Template Method that implements the actual timed wait. Note that
+ * this method is final to ensure that no one overrides it.
+ * IMPORTANT: This method assumes it is called with the object_'s
+ * monitor lock already held.
+ * If the specified wait time is zero, this checks the condition,
+ * then returns on success or throws a TimeoutException on failure.
+ *@param tv Absolute time to wait until before throwing an exception
+ * if the condition isn't satisfied
+ *@exception java.lang.InterruptedException Interrupted during wait
+ *@exception JACE.ASX.TimeoutException Reached timeout specified
+ */
+ public final void timedWait (TimeValue tv)
+ throws InterruptedException,
+ TimeoutException
+ {
+ if (tv == null) {
+ this.timedWait();
+ return;
+ }
+
+ // Acquire the monitor lock.
+ if (!condition ())
+ {
+ long start = System.currentTimeMillis();
+ long waitTime = tv.getMilliTime() - start;
+
+ for (;;) {
+
+ // Prevent a conversion from absolute to relative time from
+ // generating a zero or negative waitTime.
+ if (waitTime < 1)
+ throw new TimeoutException ();
+
+ // Wait until we are notified.
+ object_.wait (waitTime);
+
+ // Recheck the condition.
+ if (!condition ()) {
+
+ long now = System.currentTimeMillis();
+
+ // Timed out!
+ if (now >= tv.getMilliTime ())
+ throw new TimeoutException ();
+ else
+ // We still have some time left to wait, so adjust the
+ // wait_time.
+ waitTime = tv.getMilliTime() - now;
+ }
+ else
+ break; // Condition became true.
+ }
+ }
+ }
+
+ /**
+ * Notify any one thread waiting on the object_.
+ * IMPORTANT: This method assumes it is called with the object_'s
+ * monitor lock already held.
+ */
+ public final void signal () {
+ object_.notify ();
+ }
+
+ /**
+ * Notify all threads waiting on the object_.
+ * IMPORTANT: This method assumes it is called with the object_'s
+ * monitor lock already held.
+ */
+ public final void broadcast () {
+ object_.notifyAll ();
+ }
+
+ /**
+ * The object we delegate to. If a subclass gives us a particular
+ * object, we use that to delegate to, otherwise, we ``delegate''
+ * to ourself (i.e., this).
+ */
+ protected Object object_;
+
+}
diff --git a/java/JACE/ASX/TimeoutException.java b/java/JACE/ASX/TimeoutException.java
new file mode 100644
index 00000000000..d55cc4fe999
--- /dev/null
+++ b/java/JACE/ASX/TimeoutException.java
@@ -0,0 +1,37 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * TimeoutException.java
+ *
+ *@author Prashant Jain and Doug Schmidt
+ *
+ *************************************************/
+package JACE.ASX;
+
+/**
+ * Thrown when a timer has expired.
+ */
+public class TimeoutException extends Exception
+{
+ /**
+ * Default Constructor.
+ */
+ public TimeoutException ()
+ {
+ super ("Timed Out");
+ }
+
+ /**
+ * Constructor.
+ *@param timeout The timeout value which expired.
+ *@param desc Textual description of the exception
+ */
+ public TimeoutException (TimeValue timeout, String desc)
+ {
+ super ("Timed Out in " + timeout + ": " + desc);
+ }
+
+}
diff --git a/java/JACE/ASX/package.html b/java/JACE/ASX/package.html
new file mode 100644
index 00000000000..346782ed083
--- /dev/null
+++ b/java/JACE/ASX/package.html
@@ -0,0 +1,11 @@
+<!-- $Id$ -->
+<HTML>
+<BODY>
+Message queueing facilities.
+<P>
+@see <a href="http://www.cs.wustl.edu/~schmidt/ACE-papers.html#ipc">
+Documents on ACE interprocess communication components</a>
+@see <a href="http://www.cs.wustl.edu/~schmidt/ACE-papers.html#streams">
+Documents on the ACE streams framework</a>
+</BODY>
+</HTML>