diff options
author | eea1 <eea1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-08-24 23:09:41 +0000 |
---|---|---|
committer | eea1 <eea1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-08-24 23:09:41 +0000 |
commit | 8d677e61b41bfa9376a098c72aac8c633f051b82 (patch) | |
tree | 0f5d21862b09b96b4574fd520e7a25b92e1de059 /java | |
parent | a0ae352ca7060b8e09eae8f4fc577f8ceb689eff (diff) | |
download | ATCD-8d677e61b41bfa9376a098c72aac8c633f051b82.tar.gz |
Updated source files for ASX.
Diffstat (limited to 'java')
-rw-r--r-- | java/JACE/ASX/IOCntlCmds.java | 38 | ||||
-rw-r--r-- | java/JACE/ASX/IOCntlMsg.java | 124 | ||||
-rw-r--r-- | java/JACE/ASX/MessageBlock.java | 447 | ||||
-rw-r--r-- | java/JACE/ASX/MessageQueue.java | 633 | ||||
-rw-r--r-- | java/JACE/ASX/MessageType.java | 102 | ||||
-rw-r--r-- | java/JACE/ASX/Module.java | 246 | ||||
-rw-r--r-- | java/JACE/ASX/Stream.java | 436 | ||||
-rw-r--r-- | java/JACE/ASX/StreamHead.java | 123 | ||||
-rw-r--r-- | java/JACE/ASX/StreamTail.java | 114 | ||||
-rw-r--r-- | java/JACE/ASX/Task.java | 443 | ||||
-rw-r--r-- | java/JACE/ASX/TaskFlags.java | 49 | ||||
-rw-r--r-- | java/JACE/ASX/ThruTask.java | 44 | ||||
-rw-r--r-- | java/JACE/ASX/TimeValue.java | 296 | ||||
-rw-r--r-- | java/JACE/ASX/TimedWait.java | 157 | ||||
-rw-r--r-- | java/JACE/ASX/TimeoutException.java | 37 | ||||
-rw-r--r-- | java/JACE/ASX/package.html | 11 |
16 files changed, 3300 insertions, 0 deletions
diff --git a/java/JACE/ASX/IOCntlCmds.java b/java/JACE/ASX/IOCntlCmds.java new file mode 100644 index 00000000000..3cc7c76256d --- /dev/null +++ b/java/JACE/ASX/IOCntlCmds.java @@ -0,0 +1,38 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * TaskFlags.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +/** + * Constants used for IOCTL messages. + * + *@see JACE.ASX.IOCntlMsg + */ +public abstract class IOCntlCmds +{ + /** Set the low water mark. */ + public static final int SET_LWM = 1; + + /** Get the low water mark. */ + public static final int GET_LWM = 2; + + /** Set the high water mark. */ + public static final int SET_HWM = 3; + + /** Get the high water mark. */ + public static final int GET_HWM = 4; + + /** Link modules */ + public static final int MOD_LINK = 5; + + /** Unlink modules */ + public static final int MOD_UNLINK = 6; +} diff --git a/java/JACE/ASX/IOCntlMsg.java b/java/JACE/ASX/IOCntlMsg.java new file mode 100644 index 00000000000..4eea18b36f6 --- /dev/null +++ b/java/JACE/ASX/IOCntlMsg.java @@ -0,0 +1,124 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * IOCntlMsg.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +import JACE.OS.*; + +/** + * Data format for IOCTL messages. + */ +public class IOCntlMsg +{ + + // = Initialization method. + + /* + * Initialize the control message. + *@param c IOCntlCmd for the control message. Note that this should + * be of type IOCntlCmds + */ + public IOCntlMsg (int c) + { + this.cmd_ = c; + } + + // = Get/set methods + + /* + * Get the command. + *@return the command. + */ + public int cmd () + { + return this.cmd_; + } + + /* + * Set the command. + *@param c the command. + */ + public void cmd (int c) + { + this.cmd_ = c; + } + + /* + * Get the count. + *@return the count. + */ + public int count () + { + return this.count_; + } + + /* + * Set the count. + *@param c the count. + */ + public void count (int c) + { + this.count_ = c; + } + + /* + * Get the error. + *@return the error. + */ + public int error () + { + return this.error_; + } + + /* + * Set the error. + *@param e the error. + */ + public void error (int e) + { + this.error_ = e; + } + + /* + * Get the return value. + *@return the return value. + */ + public int rval () + { + return this.rval_; + } + + /* + * Set the return value. + *@param r the return value. + */ + public void rval (int r) + { + this.rval_ = r; + } + + public String toString () + { + return (new Integer (this.cmd_)).toString (); + } + + private int cmd_; + // Command. + + private int count_; + // Count. + + private int error_; + // Error. + + private int rval_; + // Return value +} diff --git a/java/JACE/ASX/MessageBlock.java b/java/JACE/ASX/MessageBlock.java new file mode 100644 index 00000000000..4035e34a7bd --- /dev/null +++ b/java/JACE/ASX/MessageBlock.java @@ -0,0 +1,447 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * MessageBlock.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +import JACE.OS.*; + +/** + * Object used to store messages in the ASX framework.<P> + * + * <tt>MessageBlock</tt> is modeled after the message data structures + * used in System V STREAMS. A <tt>MessageBlock</tt> is composed of + * one or more <tt>MessageBlock</tt>s that are linked together by + * <em>PREV</em> and <em>NEXT</em> pointers. In addition, a + * <tt>MessageBlock</tt> may also be linked to a chain of other + * <tt>MessageBlock</tt>s. This structure enables efficient manipulation + * of arbitrarily-large messages <em>without</em> incurring memory + * copying overhead. + * + *@see MessageQueue + */ +public class MessageBlock +{ + /** + * Create an empty Message Block + */ + public MessageBlock () + { + this (0); + } + + /** + * Create an empty Message Block. + * Note that this assumes that type of MessageBlock is MB_DATA. + *@param size size of the Message Block to create. + */ + public MessageBlock (int size) + { + // Note the explicit cast toString() is needed. For some strange + // reason, it fails otherwise if size == 0. + this ((new StringBuffer (size)).toString ()); + } + + /** + * Create a Message Block. Note that this assumes that type of + * MessageBlock is MB_DATA. + *@param data initial data to create a Message Block with. + */ + public MessageBlock (String data) + { + this (MessageType.MB_DATA, + null, + data); + } + + /** + * Create a Message Block. + *@param type type of the Message Block (must be one of those + * specified in class Message Type) + *@param cont next block of data + *@param data initial data to create Message Block with + */ + public MessageBlock (int type, + MessageBlock cont, + String data) + { + this.flags_ = 0; + this.priority_ = 0; + this.next_ = null; + this.prev_ = null; + + this.init (type, cont, data); + } + + /** + * Create a Message Block. Note that this assumes that type of + * MessageBlock is MB_OBJECT. + *@param obj initial object to create a Message Block with. + */ + public MessageBlock (Object obj) + { + this (MessageType.MB_OBJECT, + null, + obj); + } + + /** + * Create a Message Block. + *@param type type of the Message Block (must be one of those + * specified in class Message Type) + *@param cont next block of data + *@param obj initial object to create Message Block with + */ + public MessageBlock (int type, + MessageBlock cont, + Object obj) + { + this.init (type, cont, obj); + } + + /* Initialize the Message Block + *@param data data to initialize Message Block with + */ + public void init (String data) + { + this.base_ = new StringBuffer (data); + } + + /** + * Initialize a Message Block. + *@param type type of the Message Block (must be one of those + * specified in class Message Type) + *@param cont next block of data + *@param data data to initialize Message Block with + */ + public void init (int msgType, + MessageBlock msgCont, + String data) + { + if (data.length () == 0) + this.base_ = new StringBuffer (0); + else + this.base_ = new StringBuffer (data); + this.type_ = msgType; + this.cont_ = msgCont; + } + + /** + * Initialize a Message Block. Note that this assumes that type of + * MessageBlock is MB_OBJECT. + *@param obj initial object to initialize a Message Block with. + */ + public void init (Object obj) + { + this.init (MessageType.MB_OBJECT, null, obj); + } + + /** + * Initialize a Message Block. + *@param type type of the Message Block (must be one of those + * specified in class Message Type) + *@param cont next block of data + *@param obj object to initialize Message Block with + */ + public void init (int msgType, + MessageBlock msgCont, + Object obj) + { + this.obj_ = obj; + this.type_ = msgType; + this.cont_ = msgCont; + this.flags_ = 0; + this.priority_ = 0; + this.next_ = null; + this.prev_ = null; + } + + /** + * Set message flags. Note that the flags will be set on top of + * already set flags. + *@param moreFlags flags to set for the Message Block. + */ + public long setFlags (long moreFlags) + { + // Later we might mask more_flags so that user can't change + // internal ones: more_flags &= ~(USER_FLAGS -1). + this.flags_ = ACE.SET_BITS (this.flags_, moreFlags); + return this.flags_; + } + + /** + * Unset message flags. + *@param lessFlags flags to unset for the Message Block. + */ + public long clrFlags (long lessFlags) + { + // Later we might mask more_flags so that user can't change + // internal ones: less_flags &= ~(USER_FLAGS -1). + this.flags_ = ACE.CLR_BITS (this.flags_, lessFlags); + return this.flags_; + } + + /** + * Get the message flags. + *@return Message flags + */ + public long flags () + { + return this.flags_; + } + + /** + * Get the type of the message. + *@return message type + */ + public int msgType () + { + return this.type_; + } + + /** + * Set the type of the message. + *@param t type of the message + */ + public void msgType (int t) + { + this.type_ = t; + } + + /** + * Get the class of the message. Note there are two classes, + * <normal> messages and <high-priority> messages. + *@return message class + */ + public int msgClass () + { + return this.msgType () >= MessageType.MB_PRIORITY + ? MessageType.MB_PRIORITY : MessageType.MB_NORMAL; + } + + /** + * Find out if the message is a data message. + *@return true if message is a data message, false otherwise + */ + public boolean isDataMsg () + { + int mt = this.msgType (); + return mt == MessageType.MB_DATA + || mt == MessageType.MB_PROTO + || mt == MessageType.MB_PCPROTO; + } + + /** + * Find out if the message is an object message. + *@return true if message is an object message, false otherwise + */ + public boolean isObjMsg () + { + int mt = this.msgType (); + return mt == MessageType.MB_OBJECT + || mt == MessageType.MB_PROTO + || mt == MessageType.MB_PCPROTO; + } + + /** + * Get the priority of the message. + *@return message priority + */ + public long msgPriority () + { + return this.priority_; + } + + /** + * Set the priority of the message. + *@param pri priority of the message + */ + public void msgPriority (long pri) + { + this.priority_ = pri; + } + + /** + * Get message data. This assumes that msgType is MB_DATA. + *@return message data + */ + public String base () + { + // Create a String object to return + char temp[] = new char [this.base_.length ()]; + this.base_.getChars (0, this.base_.length (), temp, 0); + return new String (temp); + } + + /** + * Set the message data. This assumes that msgType is MB_DATA. + *@param data message data + *@param msgFlags message flags + */ + public void base (String data, + long msgFlags) + { + this.base_ = new StringBuffer (data); + this.flags_ = msgFlags; + } + + /** + * Get message object. This assumes that msgType is MB_OBJECT. + *@return message object + */ + public Object obj () + { + return this.obj_; + } + + /** + * Set the message object. This assumes that msgType is MB_OBJECT. + *@param object message object + *@param msgFlags message flags + */ + public void obj (Object obj, + long msgFlags) + { + this.obj_ = obj; + this.flags_ = msgFlags; + } + + // = The following four methods only make sense if the Message_Block + // is of type MB_DATA and not MB_OBJECT. + + /** + * Get length of the message. This method only makes sense if the + * MessageBlock is of type MB_DATA and not MB_OBJECT. + *@return length of the message. + */ + public int length () + { + return this.base_.length (); + } + + /** + * Set the length of the message. This method only makes sense if the + * MessageBlock is of type MB_DATA and not MB_OBJECT. + *@param n message length + */ + public void length (int n) + { + this.base_.setLength (n); + } + + /** + * Get size of the allocated buffer for the message. This method + * only makes sense if the MessageBlock is of type MB_DATA and not + * MB_OBJECT. + *@return size of the message buffer + */ + public int size () + { + return this.base_.capacity (); + } + + /** + * Set the total size of the buffer. This method will grow the + * buffer if need be. Also, this method only makes sense if the + * MessageBlock is of type MB_DATA and not MB_OBJECT. + *@param n size of message buffer + */ + public void size (int n) + { + this.base_.ensureCapacity (n); + } + + + /** + * Get the continuation field. The coninuation field is used to + * chain together composite messages. + *@return the continuation field + */ + public MessageBlock cont () + { + return this.cont_; + } + + /** + * Set the continuation field. The coninuation field is used to + * chain together composite messages. + *@param msgCont continuation field + */ + void cont (MessageBlock msgCont) + { + this.cont_ = msgCont; + } + + /** + * Get link to next message. The next message points to the + * <MessageBlock> directly ahead in the MessageQueue. + *@return next message block + */ + MessageBlock next () + { + return this.next_; + } + + /** + * Set link to next message. The next message points to the + * <MessageBlock> directly ahead in the MessageQueue. + *@param msgBlock next message block + */ + void next (MessageBlock msgBlock) + { + this.next_ = msgBlock; + } + + /** + * Get link to previous message. The previous message points to the + * <MessageBlock> directly before in the MessageQueue. + *@return previous message block + */ + MessageBlock prev () + { + return this.prev_; + } + + /** + * Set link to previous message. The previous message points to the + * <MessageBlock> directly before in the MessageQueue. + *@param msgBlock previous message block + */ + void prev (MessageBlock msgBlock) + { + this.prev_ = msgBlock; + } + + private int type_; + // Type of message. + + private long flags_; + // Misc flags. + + private long priority_; + // Priority of message. + + private StringBuffer base_; + // String data of message block (initialized to null). + + private Object obj_; + // Object data of message block (initialized to null). + + private MessageBlock cont_; + // Next message block in the chain. + + private MessageBlock next_; + // Next message in the list. + + private MessageBlock prev_; + // Previous message in the list. + +} + diff --git a/java/JACE/ASX/MessageQueue.java b/java/JACE/ASX/MessageQueue.java new file mode 100644 index 00000000000..df25870dd52 --- /dev/null +++ b/java/JACE/ASX/MessageQueue.java @@ -0,0 +1,633 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * MessageQueue.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +import java.util.Date; +import JACE.OS.*; +import JACE.Reactor.*; + +class NotFullCondition extends TimedWait +{ + public NotFullCondition (MessageQueue mq) + { + super (mq); + this.mq_ = mq; + } + + public boolean condition () { + // Delegate to the appropriate conditional + // check on the MessageQueue. + return !this.mq_.isFull (); + } + private MessageQueue mq_; +} + +class NotEmptyCondition extends TimedWait +{ + public NotEmptyCondition (MessageQueue mq) + { + super (mq); + this.mq_ = mq; + } + + public boolean condition () { + // Delegate to the appropriate conditional + // check on the MessageQueue. + return !this.mq_.isEmpty (); + } + private MessageQueue mq_; +} + + +/** + * A thread-safe message queueing facility, modeled after the + * queueing facilities in System V StreamS. <P> + * + * <tt>MessageQueue</tt> is the central queueing facility for messages + * in the ASX framework. All operations are thread-safe, as it is intended + * to be used for inter-thread communication (<em>e.g.</em>, a producer and + * consumer thread joined by a <tt>MessageQueue</tt>). The queue + * consists of <tt>MessageBlock</tt>s. + *</blockquote> + * + *@see MessageBlock + *@see TimeValue + */ +public class MessageQueue +{ + /** + * Default constructor + */ + public MessageQueue () + { + this (DEFAULT_HWM, DEFAULT_LWM); + } + + /** + * Create a Message Queue with high and low water marks. + *@param hwm High water mark (max number of bytes allowed in the + * queue) + *@param lwm Low water mark (min number of bytes in the queue) + */ + public MessageQueue (int hwm, int lwm) + { + if (this.open (hwm, lwm) == -1) + ACE.ERROR ("open"); + } + + /** + * Initialize a Message Queue with high and low water marks. + *@param hwm High water mark (max number of bytes allowed in the + * queue) + *@param lwm Low water mark (min number of bytes in the queue) + */ + public synchronized int open (int hwm, int lwm) + { + this.highWaterMark_ = hwm; + this.lowWaterMark_ = lwm; + this.deactivated_ = false; + this.currentBytes_ = 0; + this.currentCount_ = 0; + this.tail_ = null; + this.head_ = null; + return 0; + } + + // ************ Note! *********** + // = For enqueue, enqueueHead, enqueueTail, and dequeueHead if + // timeout is specified, the caller will wait until the *absolute time* + // tv. Calls will return, however, when queue is closed, + // deactivated, or if it is past the time tv + + /** + * Enqueue a <MessageBlock> into the <MessageQueue> in accordance + * with its <msgPriority> (0 is lowest priority). Note that the + * call will block (unless the queue has been deactivated). + * + *@exception java.lang.InterruptedException Interrupted while accessing queue + *@param newItem item to enqueue onto the Message Queue + *@return -1 on failure, else the number of items still on the queue. + */ + public synchronized int enqueue (MessageBlock newItem) throws InterruptedException + { + return this.enqueue (newItem, null); + } + + /** + * Enqueue a <MessageBlock> into the <MessageQueue> in accordance + * with its <msgPriority> (0 is lowest priority). Note that the + * call will return if the queue has been deactivated or it is + * later than the specified absolute time value. + *@param newItem item to enqueue onto the Message Queue + *@param tv absolute TimeValue to timeout after + *@return -1 on failure, else the number of items still on the + * queue. + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + public synchronized int enqueue (MessageBlock newItem, + TimeValue tv) throws InterruptedException + { + int result = -1; + if (this.deactivated_) + return -1; + try + { + if (tv == null) // Need to do a blocking wait + notFullCondition_.timedWait (); + else // Need to do a timed wait + notFullCondition_.timedWait (tv); + } + catch (TimeoutException e) + { + return -1; + } + + // Check again if queue is still active + if (this.deactivated_) + return -1; + else + result = this.enqueueInternal (newItem); + + // Tell any blocked threads that the queue has a new item! + this.notEmptyCondition_.broadcast (); + return result; + } + + /** + * Enqueue a <MessageBlock> at the end of the <MessageQueue>. Note + * that the call will block (unless the queue has been deactivated). + *@param newItem item to enqueue onto the Message Queue + *@return -1 on failure, else the number of items still on the queue. + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + public synchronized int enqueueTail (MessageBlock newItem) throws InterruptedException + { + return this.enqueueTail (newItem, null); + } + + /** + * Enqueue a <MessageBlock> at the end of the <MessageQueue>. Note + * that the call will return when it's later than the given TimeValue or + * if the queue has been deactivated. + *@param newItem item to enqueue onto the Message Queue + *@param tv absolute TimeValue to wait until before returning (unless + * the operation compeltes before this time) + *@return -1 on failure, else the number of items still on the queue. + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + public synchronized int enqueueTail (MessageBlock newItem, + TimeValue tv) throws InterruptedException + { + int result = -1; + if (this.deactivated_) + return -1; + try + { + if (tv == null) // Need to do a blocking wait + notFullCondition_.timedWait (); + else // Need to do a timed wait + notFullCondition_.timedWait (tv); + } + catch (TimeoutException e) + { + return -1; + } + + // Check again if queue is still active + if (this.deactivated_) + return -1; + else + result = this.enqueueTailInternal (newItem); + + // Tell any blocked threads that the queue has a new item! + this.notEmptyCondition_.broadcast (); + return result; + } + + /** + * Enqueue a <MessageBlock> at the head of the <MessageQueue>. Note + * that the call will block (unless the queue has been deactivated). + *@param newItem item to enqueue onto the Message Queue + *@return -1 on failure, else the number of items still on the queue. + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + public synchronized int enqueueHead (MessageBlock newItem) throws InterruptedException + { + return this.enqueueHead (newItem, null); + } + + /** + * Enqueue a <MessageBlock> at the head of the <MessageQueue>. Note + * that the call will return when it's later than the given TimeValue or + * if the queue has been deactivated. + *@param newItem item to enqueue onto the Message Queue + *@param tv absolute TimeValue to wait until before returning (unless + * the operation completes before that time) + *@return -1 on failure, else the number of items still on the queue. + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + public synchronized int enqueueHead (MessageBlock newItem, + TimeValue tv) throws InterruptedException + { + int result = -1; + if (this.deactivated_) + return -1; + try + { + if (tv == null) // Need to do a blocking wait + notFullCondition_.timedWait (); + else // Need to do a timed wait + notFullCondition_.timedWait (tv); + } + catch (TimeoutException e) + { + return -1; + } + + // Check again if queue is still active + if (this.deactivated_) + return -1; + else + result = this.enqueueHeadInternal (newItem); + + // Tell any blocked threads that the queue has a new item! + this.notEmptyCondition_.broadcast (); + return result; + } + + /** + * Dequeue and return the <MessageBlock> at the head of the + * <MessageQueue>. Note that the call will block (unless the queue + * has been deactivated). + *@return null on failure, else the <MessageBlock> at the head of queue. + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + public synchronized MessageBlock dequeueHead () throws InterruptedException + { + return this.dequeueHead (null); + } + + /** + * Dequeue and return the <MessageBlock> at the head of the + * <MessageQueue>. Note that the call when return if the queue has + * been deactivated or when the current time is later than the given + * time value. + *@param tv absolute time timeout (blocks indefinitely if null) + *@return null on failure, else the <MessageBlock> at the head of queue. + *@exception InterruptedException Interrupted while accessing queue + */ + public synchronized MessageBlock dequeueHead (TimeValue tv) + throws InterruptedException + { + MessageBlock result = null; + if (this.deactivated_) + return null; + try + { + if (tv == null) // Need to do a blocking wait + notEmptyCondition_.timedWait (); + else // Need to do a timed wait + notEmptyCondition_.timedWait (tv); + } + catch (TimeoutException e) + { + return null; + } + + // Check again if queue is still active + if (this.deactivated_) + return null; + else + result = this.dequeueHeadInternal (); + + // Tell any blocked threads that the queue has room for an item! + this.notFullCondition_.broadcast (); + return result; + } + + /** + * Check if queue is full. + *@return true if queue is full, else false. + */ + public synchronized boolean isFull () + { + return this.isFullInternal (); + } + + /** + * Check if queue is empty. + *@return true if queue is empty, else false. + */ + public synchronized boolean isEmpty () + { + return this.isEmptyInternal (); + } + + /** + * Get total number of bytes on the queue. + *@return total number number of bytes on the queue + */ + public int messageBytes () + { + return this.currentBytes_; + } + + /** + * Get total number of messages on the queue. + *@return total number number of messages on the queue + */ + public int messageCount () + { + return this.currentCount_; + } + + // = Flow control routines + + /** + * Get high watermark. + *@return high watermark + */ + public int highWaterMark () + { + return this.highWaterMark_; + } + + /** + * Set high watermark. + *@param hwm high watermark + */ + public void highWaterMark (int hwm) + { + this.highWaterMark_ = hwm; + } + + /** + * Get low watermark. + *@return low watermark + */ + public int lowWaterMark () + { + return this.lowWaterMark_; + } + + /** + * Set low watermark. + *@param lwm low watermark + */ + public void lowWaterMark (int lwm) + { + this.lowWaterMark_ = lwm; + } + + // = Activation control methods. + + /** + * Deactivate the queue and wakeup all threads waiting on the queue + * so they can continue. No messages are removed from the queue, + * however. Any other operations called until the queue is + * activated again will immediately return -1. + *@return WAS_INACTIVE if queue was inactive before the call and + * WAS_ACTIVE if queue was active before the call. + */ + public synchronized int deactivate () + { + return this.deactivateInternal (); + } + + + /** + * Reactivate the queue so that threads can enqueue and dequeue + * messages again. + *@return WAS_INACTIVE if queue was inactive before the call and + * WAS_ACTIVE if queue was active before the call. + */ + public synchronized int activate () + { + return this.activateInternal (); + } + + protected boolean isEmptyInternal () + { + // Not sure about this one!!!! + return this.currentBytes_ <= this.lowWaterMark_ && this.currentCount_ <= 0; + } + + protected boolean isFullInternal () + { + return this.currentBytes_ > this.highWaterMark_; + } + + protected int deactivateInternal () + { + int currentStatus = + this.deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; + + this.notFullCondition_.broadcast (); + this.notEmptyCondition_.broadcast (); + + this.deactivated_ = true; + return currentStatus; + } + + protected int activateInternal () + { + int currentStatus = + this.deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; + this.deactivated_ = false; + + return currentStatus; + } + + protected int enqueueTailInternal (MessageBlock newItem) + { + if (newItem == null) + return -1; + + // List was empty, so build a new one. + if (this.tail_ == null) + { + this.head_ = newItem; + this.tail_ = newItem; + newItem.next (null); + newItem.prev (null); + } + // Link at the end. + else + { + newItem.next (null); + this.tail_.next (newItem); + newItem.prev (this.tail_); + this.tail_ = newItem; + } + + if (newItem.msgType() != MessageType.MB_OBJECT) + { + // Make sure to count *all* the bytes in a composite message!!! + for (MessageBlock temp = newItem; + temp != null; + temp = temp.cont ()) + this.currentBytes_ += temp.size (); + } + + this.currentCount_++; + return this.currentCount_; + } + + protected int enqueueHeadInternal (MessageBlock newItem) + { + if (newItem == null) + return -1; + + newItem.prev (null); + newItem.next (this.head_); + + if (this.head_ != null) + this.head_.prev (newItem); + else + this.tail_ = newItem; + + this.head_ = newItem; + + if (newItem.msgType() != MessageType.MB_OBJECT) + { + // Make sure to count *all* the bytes in a composite message!!! + for (MessageBlock temp = newItem; + temp != null; + temp = temp.cont ()) + this.currentBytes_ += temp.size (); + } + + this.currentCount_++; + + return this.currentCount_; + } + + protected int enqueueInternal (MessageBlock newItem) + { + if (newItem == null) + return -1; + + if (this.head_ == null) + // Check for simple case of an empty queue, where all we need to + // do is insert <newItem> into the head. + return this.enqueueHeadInternal (newItem); + else + { + MessageBlock temp; + + // Figure out where the new item goes relative to its priority. + + for (temp = this.head_; + temp != null; + temp = temp.next ()) + { + if (temp.msgPriority () <= newItem.msgPriority ()) + // Break out when we've located an item that has lower + // priority that <newItem>. + break; + } + + if (temp == null) + // Check for simple case of inserting at the end of the queue, + // where all we need to do is insert <newItem> after the + // current tail. + return this.enqueueTailInternal (newItem); + else if (temp.prev () == null) + // Check for simple case of inserting at the beginning of the + // queue, where all we need to do is insert <newItem> before + // the current head. + return this.enqueueHeadInternal (newItem); + else + { + // Insert the message right before the item of equal or lower + // priority. + newItem.next (temp); + newItem.prev (temp.prev ()); + temp.prev ().next (newItem); + temp.prev (newItem); + } + } + + if (newItem.msgType() != MessageType.MB_OBJECT) + { + // Make sure to count *all* the bytes in a composite message!!! + for (MessageBlock temp = newItem; + temp != null; + temp = temp.cont ()) + this.currentBytes_ += temp.size (); + } + + this.currentCount_++; + return this.currentCount_; + } + + protected MessageBlock dequeueHeadInternal () + { + MessageBlock firstItem = this.head_; + this.head_ = this.head_.next (); + + if (this.head_ == null) + this.tail_ = null; + + if (firstItem.msgType() != MessageType.MB_OBJECT) + { + // Make sure to subtract off all of the bytes associated with this + // message. + for (MessageBlock temp = firstItem; + temp != null; + temp = temp.cont ()) + this.currentBytes_ -= temp.size (); + } + + this.currentCount_--; + return firstItem; + } + + + /** Default high watermark (16 K). */ + public final static int DEFAULT_HWM = 16 * 1024; + + /** Default low watermark. */ + public final static int DEFAULT_LWM = 0; + + /** Message queue was active before activate() or deactivate(). */ + public final static int WAS_ACTIVE = 1; + + /** Message queue was inactive before activate() or deactivate(). */ + public final static int WAS_INACTIVE = 2; + + private int highWaterMark_; + // Greatest number of bytes before blocking. + + private int lowWaterMark_; + // Lowest number of bytes before unblocking occurs. + + private boolean deactivated_; + // Indicates that the queue is inactive. + + private int currentBytes_; + // Current number of bytes in the queue. + + private int currentCount_; + // Current number of messages in the queue. + + private MessageBlock head_; + // Head of Message_Block list. + + private MessageBlock tail_; + // Tail of Message_Block list. + + // The Delegated Notification mechanisms. + private NotFullCondition notFullCondition_ = new NotFullCondition (this); + private NotEmptyCondition notEmptyCondition_ = new NotEmptyCondition (this); + +} diff --git a/java/JACE/ASX/MessageType.java b/java/JACE/ASX/MessageType.java new file mode 100644 index 00000000000..97e33a6c6ba --- /dev/null +++ b/java/JACE/ASX/MessageType.java @@ -0,0 +1,102 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * MessageType.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +/** + * Message types used by MessageBlock. <P> + * + * Defines bit masks used to identify various types of messages.<P> + * + * This class is not intended to be instantiable. + * + *@see MessageBlock + */ +public class MessageType +{ + // = Data and protocol messages (regular and priority) + /** regular data */ + public static final int MB_DATA = 0x01; + + /** protocol control */ + public static final int MB_PROTO = 0x02; + + /** regular data */ + public static final int MB_OBJECT = 0x09; + + + // = Control messages (regular and priority) + /** line break */ + public static final int MB_BREAK = 0x03; + + /** pass file pointer */ + public static final int MB_PASSFP = 0x04; + + /** post an event to an event queue */ + public static final int MB_EVENT = 0x05; + + /** generate process signal */ + public static final int MB_SIG = 0x06; + + /** ioctl; set/get params */ + public static final int MB_IOCTL = 0x07; + + /** set various stream head options */ + public static final int MB_SETOPTS = 0x08; + + + // = Control messages (high priority; go to head of queue) + /** acknowledge ioctl */ + public static final int MB_IOCACK = 0x81; + + /** negative ioctl acknowledge */ + public static final int MB_IOCNAK = 0x82; + + /** priority proto message */ + public static final int MB_PCPROTO = 0x83; + + /** generate process signal */ + public static final int MB_PCSIG = 0x84; + + /** generate read notification */ + public static final int MB_READ = 0x85; + + /** flush your queues */ + public static final int MB_FLUSH = 0x86; + + /** stop transmission immediately */ + public static final int MB_STOP = 0x87; + + /** restart transmission after stop */ + public static final int MB_START = 0x88; + + /** line disconnect */ + public static final int MB_HANGUP = 0x89; + + /** fatal error used to set u.u_error */ + public static final int MB_ERROR = 0x8a; + + /** post an event to an event queue */ + public static final int MB_PCEVENT = 0x8b; + + + /** Normal priority messages */ + public static final int MB_NORMAL = 0x00; + + /** High priority control messages */ + public static final int MB_PRIORITY = 0x80; + + // Default private constructor to avoid instantiation + private MessageType () + { + } +} + diff --git a/java/JACE/ASX/Module.java b/java/JACE/ASX/Module.java new file mode 100644 index 00000000000..b6a3468cfd1 --- /dev/null +++ b/java/JACE/ASX/Module.java @@ -0,0 +1,246 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * Module.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +import JACE.OS.*; + +/** + * Provides an abstraction for managing a bi-directional flow of + * messages. <P> + * + * This is based on the Module concept in System V Streams, + * which contains a pair of Tasks, one for handling upstream + * processing, one for handling downstream processing. + */ +public class Module +{ + // = Initialization and termination methods. + + /** + * Create an empty Module. + */ + public Module () + { + // Do nothing... + this.name ("<unknown>"); + } + + /* + * Create an initialized module. + *@param modName identity of the module. + *@param writerQ writer task of the module. + *@param readerQ reader task of the module. + *@param flags Module flags + */ + public Module (String modName, + Task writerQ, + Task readerQ, + Object flags) + { + this.open (modName, writerQ, readerQ, flags); + } + + /* + * Create an initialized module. + *@param modName identity of the module. + *@param writerQ writer task of the module. + *@param readerQ reader task of the module. + *@param flags Module flags + */ + public void open (String modName, + Task writerQ, + Task readerQ, + Object arg) + { + this.name (modName); + this.arg_ = arg; + + if (writerQ == null) + writerQ = new ThruTask (); + if (readerQ == null) + readerQ = new ThruTask (); + + this.reader (readerQ); + this.writer (writerQ); + + // Setup back pointers. + readerQ.module (this); + writerQ.module (this); + } + + + /* + * Set the writer task. + *@param q the writer task + */ + public void writer (Task q) + { + this.qPair_[1] = q; + if (q != null) + q.flags (ACE.CLR_BITS (q.flags (), TaskFlags.ACE_READER)); + } + + /* + * Set the reader task. + *@param q the reader task + */ + public void reader (Task q) + { + this.qPair_[0] = q; + if (q != null) + q.flags (ACE.SET_BITS (q.flags (), TaskFlags.ACE_READER)); + } + + /* + * Link this Module on top of Module. + *@param m the module to link this on top of. + */ + public void link (Module m) + { + this.next (m); + this.writer ().next (m.writer ()); + m.reader ().next (this.reader ()); + } + + /* + * Set and get pointer to sibling Task in Module. + *@param orig the task to get the sibling for + *@return the sibling of the task + */ + public Task sibling (Task orig) + { + if (this.qPair_[0] == orig) + return this.qPair_[1]; + else if (this.qPair_[1] == orig) + return this.qPair_[0]; + else + return null; + } + + /* + * Close down the module and its tasks. + *@param flags Module flags + *@return 0 on success, -1 on failure + */ + public int close (long flags) + { + Task readerQ = this.reader (); + Task writerQ = this.writer (); + int result = 0; + + if (readerQ != null) + { + if (readerQ.close (flags) == -1) + result = -1; + readerQ.flush (flags); + readerQ.next (null); + } + + if (writerQ != null) + { + if (writerQ.close (flags) == -1) + result = -1; + writerQ.flush (flags); + writerQ.next (null); + } + + return result; + } + + /* + * Get the argument passed to tasks. + *@return the argument passed to tasks. + */ + public Object arg () + { + return this.arg_; + } + + /* + * Set the argument to be passed to tasks. + *@param a the argument to be passed to tasks. + */ + public void arg (Object a) + { + this.arg_ = a; + } + + /* + * Get the name of the module. + *@return the name of the module. + */ + public String name () + { + return this.name_; + } + + /* + * Set the name of the module. + *@param n the name of the module. + */ + public void name (String n) + { + this.name_ = n; + } + + /* + * Get the writer task of the module. + *@return the writer task of the module. + */ + public Task writer () + { + return this.qPair_[1]; + } + + /* + * Get the reader task of the module. + *@return the reader task of the module. + */ + public Task reader () + { + return this.qPair_[0]; + } + + /* + * Get the next pointer to the module above in the stream. + *@return the next pointer to the module above in the stream. + */ + public Module next () + { + return this.next_; + } + + /* + * Set the next pointer to the module above in the stream. + *@param m the next pointer to the module above in the stream. + */ + public void next (Module m) + { + this.next_ = m; + } + + private Task qPair [] = new Task[2]; + // Pair of Tasks that form the "read-side" and "write-side" of the + // ACE_Module partitioning. + + private String name_ = null; + // Name of the ACE_Module. + + private Module next_; + // Next ACE_Module in the stack. + + private Object arg_; + // Argument passed through to the reader and writer task when they + // are opened. + +} + diff --git a/java/JACE/ASX/Stream.java b/java/JACE/ASX/Stream.java new file mode 100644 index 00000000000..6a968714ab7 --- /dev/null +++ b/java/JACE/ASX/Stream.java @@ -0,0 +1,436 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * Stream.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +import JACE.OS.*; + +/** + * This class is the primary abstraction for the ASX framework. + * It is moduled after System V Stream. <P> + * + * A Stream consists of a stack of Modules, each of which + * contains two Tasks. + * + *@see Module + *@see Task + */ + +public class Stream +{ + + public Stream () + { + this (null, null, null); + } + + // Create a Stream consisting of <head> and <tail> as the Stream + // head and Stream tail, respectively. If these are 0 then the + // <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively. + // <arg> is the value past in to the open() methods of the tasks. + + public Stream (Object a, + Module head, + Module tail) + { + this.linkedUs_ = null; + // this.final_close_ = this.lock_; + + if (this.open (a, head, tail) == -1) + ACE.ERROR ("open" + head.name () + " " + tail.name ()); + } + + public int push (Module newTop) + { + if (this.pushModule (newTop, + this.streamHead_.next (), + this.streamHead_) == -1) + return -1; + else + return 0; + } + + // Note that the timeout tv is absolute time + public int put (MessageBlock mb, TimeValue tv) + { + return this.streamHead_.writer ().put (mb, tv); + } + + // Note that the timeout tv is absolute time + public MessageBlock get (TimeValue tv) throws InterruptedException + { + return this.streamHead_.reader ().getq (tv); + } + +// Return the "top" ACE_Module in a ACE_Stream, skipping over the +// stream_head. + + public Module top () + { + if (this.streamHead_.next () == this.streamTail_) + return null; + else + return this.streamHead_.next (); + } + +// Remove the "top" ACE_Module in a ACE_Stream, skipping over the +// stream_head. + + public int pop (long flags) + { + if (this.streamHead_.next () == this.streamTail_) + return -1; + else + { + // Skip over the ACE_Stream head. + Module top = this.streamHead_.next (); + Module newTop = top.next (); + + this.streamHead_.next (newTop); + + // Close the top ACE_Module. + + top.close (flags); + + this.streamHead_.writer ().next (newTop.writer ()); + newTop.reader ().next (this.streamHead_.reader ()); + + return 0; + } + } + +// Remove a named ACE_Module from an arbitrary place in the +// ACE_Stream. + + public int remove (String name, long flags) + { + Module prev = null; + + for (Module mod = this.streamHead_; + mod != null; mod = mod.next ()) + if (name.compareTo (mod.name ()) == 0) + { + if (prev == null) // Deleting ACE_Stream Head + this.streamHead_.link (mod.next ()); + else + prev.link (mod.next ()); + + mod.close (flags); + return 0; + } + else + prev = mod; + + return -1; + } + + public Module find (String name) + { + for (Module mod = this.streamHead_; + mod != null; + mod = mod.next ()) + if (name.compareTo (mod.name ()) == 0) + return mod; + + return null; + } + +// Actually push a module onto the stack... + + private int pushModule (Module newTop, + Module currentTop, + Module head) + { + Task ntReader = newTop.reader (); + Task ntWriter = newTop.writer (); + Task ctReader = null; + Task ctWriter = null; + + if (currentTop != null) + { + ctReader = currentTop.reader (); + ctWriter = currentTop.writer (); + ctReader.next (ntReader); + } + + ntWriter.next (ctWriter); + + if (head != null) + { + if (head != newTop) + head.link (newTop); + } + else + ntReader.next (null); + + newTop.next (currentTop); + + if (ntReader.open (newTop.arg ()) == -1) + return -1; + + if (ntWriter.open (newTop.arg ()) == -1) + return -1; + return 0; + } + + public synchronized int open (Object a, + Module head, + Module tail) + { + Task h1 = null, h2 = null; + Task t1 = null, t2 = null; + + if (head == null) + { + h1 = new StreamHead (); + h2 = new StreamHead (); + head = new Module ("ACEStreamHead", h1, h2, a); + } + + if (tail == null) + { + t1 = new StreamTail (); + t2 = new StreamTail (); + tail = new Module ("ACEStreamTail", + t1, t2, a); + } + + // Make sure *all* the allocation succeeded! + if (h1 == null || h2 == null || head == null + || t1 == null || t2 == null || tail == null) + { + // Close up! + head.close (0); + tail.close (0); + return -1; + } + + this.streamHead_ = head; + this.streamTail_ = tail; + + if (this.pushModule (this.streamTail_, + null, null) == -1) + return -1; + else if (this.pushModule (this.streamHead_, + this.streamTail_, + this.streamHead_) == -1) + return -1; + else + return 0; + } + + public synchronized int close (long flags) + { + if (this.streamHead_ != null + && this.streamTail_ != null) + { + // Don't bother checking return value here. + this.unlinkInternal (); + + int result = 0; + + // Remove and cleanup all the intermediate modules. + + while (this.streamHead_.next () != this.streamTail_) + { + if (this.pop (flags) == -1) + result = -1; + } + + // Clean up the head and tail of the stream. + if (this.streamHead_.close (flags) == -1) + result = -1; + if (this.streamTail_.close (flags) == -1) + result = -1; + + this.streamHead_ = null; + this.streamTail_ = null; + + // Tell all threads waiting on the close that we are done. + // this.final_close_.broadcast (); + return result; + } + return 0; + } + + public int control (int cmd, Object a) throws InterruptedException + { + IOCntlMsg ioc = new IOCntlMsg (cmd); + + // Create a data block that contains the user-supplied data. + MessageBlock db = + new MessageBlock (MessageType.MB_IOCTL, + null, + a); + + // Create a control block that contains the control field and a + // pointer to the data block. + MessageBlock cb = + new MessageBlock (MessageType.MB_IOCTL, + db, + (Object) ioc); + + int result = 0; + + if (this.streamHead_.writer ().put (cb, null) == -1) + result = -1; + else if ((cb = this.streamHead_.reader ().getq (null)) == null) + result = -1; + else + result = ((IOCntlMsg ) cb.obj ()).rval (); + + return result; + } + +// Link two streams together at their bottom-most Modules (i.e., the +// one just above the Stream tail). Note that all of this is premised +// on the fact that the Stream head and Stream tail are non-NULL... +// This must be called with locks held. + + private int linkInternal (Stream us) + { + this.linkedUs_ = us; + // Make sure the other side is also linked to us! + us.linkedUs_ = this; + + Module myTail = this.streamHead_; + + if (myTail == null) + return -1; + + // Locate the module just above our Stream tail. + while (myTail.next () != this.streamTail_) + myTail = myTail.next (); + + Module otherTail = us.streamHead_; + + if (otherTail == null) + return -1; + + // Locate the module just above the other Stream's tail. + while (otherTail.next () != us.streamTail_) + otherTail = otherTail.next (); + + // Reattach the pointers so that the two streams are linked! + myTail.writer ().next (otherTail.reader ()); + otherTail.writer ().next (myTail.reader ()); + return 0; + } + + public synchronized int link (Stream us) + { + return this.linkInternal (us); + } + +// Must be called with locks held... + + private int unlinkInternal () + { + // Only try to unlink if we are in fact still linked! + + if (this.linkedUs_ != null) + { + Module myTail = this.streamHead_; + + // Only relink if we still exist! + if (myTail != null) + { + // Find the module that's just before our stream tail. + while (myTail.next () != this.streamTail_) + myTail = myTail.next (); + + // Restore the writer's next() link to our tail. + myTail.writer ().next (this.streamTail_.writer ()); + } + + Module otherTail = this.linkedUs_.streamHead_; + + // Only fiddle with the other side if it in fact still remains. + if (otherTail != null) + { + while (otherTail.next () != this.linkedUs_.streamTail_) + otherTail = otherTail.next (); + + otherTail.writer ().next (this.linkedUs_.streamTail_.writer ()); + + } + + // Make sure the other side is also aware that it's been unlinked! + this.linkedUs_.linkedUs_ = null; + + this.linkedUs_ = null; + return 0; + } + else + return -1; + } + + public synchronized int unlink () + { + return this.unlinkInternal (); + } + + public void dump () + { + ACE.DEBUG ("-------- module links --------"); + + for (Module mp = this.streamHead_; ; mp = mp.next ()) + { + ACE.DEBUG ("module name = " + mp.name ()); + if (mp == this.streamTail_) + break; + } + + ACE.DEBUG ("-------- writer links --------"); + + Task tp; + + for (tp = this.streamHead_.writer (); ; tp = tp.next ()) + { + ACE.DEBUG ("writer queue name = " + tp.name ()); + tp.dump (); + ACE.DEBUG ("-------\n"); + if (tp == this.streamTail_.writer () + || (this.linkedUs_ != null && tp == this.linkedUs_.streamHead_.reader ())) + break; + } + + ACE.DEBUG ("-------- reader links --------\n"); + for (tp = this.streamTail_.reader (); ; tp = tp.next ()) + { + ACE.DEBUG ("reader queue name = " + tp.name ()); + tp.dump (); + ACE.DEBUG ("-------\n"); + if (tp == this.streamHead_.reader () + || (this.linkedUs_ != null && tp == this.linkedUs_.streamHead_.writer ())) + break; + } + } + + Module streamHead_ = null; + // Pointer to the head of the stream. + + Module streamTail_ = null; + // Pointer to the tail of the stream. + + Stream linkedUs_ = null; + // Pointer to an adjoining linked stream. + + // = Synchronization objects used for thread-safe streams. + // ACE_SYNCH_MUTEX lock_; + // Protect the stream against race conditions. + + // ACE_SYNCH_CONDITION final_close_; + // Use to tell all threads waiting on the close that we are done. + +} + + diff --git a/java/JACE/ASX/StreamHead.java b/java/JACE/ASX/StreamHead.java new file mode 100644 index 00000000000..1492b43a297 --- /dev/null +++ b/java/JACE/ASX/StreamHead.java @@ -0,0 +1,123 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * StreamHead.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +import JACE.OS.*; + +/** + * Standard module that acts as the head of a ustream. + */ + +public class StreamHead extends Task +{ + // Module that acts as the head of a Stream. + + public int open (Object obj) + { + return 0; + } + + public int close (long l) + { + return 0; + } + + public int svc () + { + return -1; + } + + private int control (MessageBlock mb) + { + + IOCntlMsg ioc = (IOCntlMsg) mb.obj (); + int cmd = ioc.cmd (); + + switch (cmd) + { + case IOCntlCmds.SET_LWM: + case IOCntlCmds.SET_HWM: + this.waterMarks (cmd, mb.cont ().length ()); + ioc.rval (0); + break; + default: + return 0; + } + return ioc.rval (); + } + + /* Performs canonical flushing at the ACE_Stream Head */ + + private int canonicalFlush (MessageBlock mb) + { + String s = mb.base (); + long f = (new Long (s)).longValue (); + + if ((f & TaskFlags.ACE_FLUSHR) != 0) + { + this.flush (TaskFlags.ACE_FLUSHALL); + f &= ~TaskFlags.ACE_FLUSHR; + } + if ((f & TaskFlags.ACE_FLUSHW) != 0) + return this.reply (mb, null); + return 0; + } + + // Will block forever to add the given MessageBlock + public int put (MessageBlock mb) + { + return this.put (mb, null); + } + + // tv is absolute time + public int put (MessageBlock mb, TimeValue tv) + { + int res = 0; + if (mb.msgType () == MessageType.MB_IOCTL + && (res = this.control (mb)) == -1) + return res; + + if (this.isWriter ()) + { + return this.putNext (mb, tv); + } + else /* this.isReader () */ + { + switch (mb.msgType ()) + { + case MessageType.MB_FLUSH: + return this.canonicalFlush (mb); + default: + break; + } + + try + { + return this.putq (mb, tv); + } + catch (InterruptedException e) + { + return -1; + } + } + } + + public void dump () + { + } + + public int handleTimeout (TimeValue tv, Object obj) + { + return 0; + } + +} diff --git a/java/JACE/ASX/StreamTail.java b/java/JACE/ASX/StreamTail.java new file mode 100644 index 00000000000..c1148a4c0f1 --- /dev/null +++ b/java/JACE/ASX/StreamTail.java @@ -0,0 +1,114 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * StreamTail.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +import JACE.OS.*; + +/** + * Standard module that acts as the tail of a ustream. + */ +public class StreamTail extends Task +{ + // Module that acts as the tail of a Stream. + + public int open (Object obj) + { + return 0; + } + + public int close (long l) + { + return 0; + } + + public int svc () + { + return -1; + } + + private int control (MessageBlock mb) + { + IOCntlMsg ioc = (IOCntlMsg) mb.obj (); + int cmd = ioc.cmd (); + + switch (cmd) + { + case IOCntlCmds.SET_LWM: + case IOCntlCmds.SET_HWM: + { + int size = mb.cont ().length (); + + this.waterMarks (cmd, size); + this.sibling ().waterMarks (cmd, size); + ioc.rval (0); + break; + } + default: + mb.msgType (MessageType.MB_IOCNAK); + } + return this.reply (mb, null); + } + + // Perform flush algorithm as though we were the driver + private int canonicalFlush (MessageBlock mb) + { + String s = mb.base (); + long f = (new Long (s)).longValue (); + + if ((f & TaskFlags.ACE_FLUSHW) != 0) + { + this.flush (TaskFlags.ACE_FLUSHALL); + f &= ~TaskFlags.ACE_FLUSHW; + } + if ((f & TaskFlags.ACE_FLUSHR) != 0) + { + this.sibling ().flush (TaskFlags.ACE_FLUSHALL); + return this.reply (mb, null); + } + return 0; + } + + // put the given MessageBlock without a timeout (block forever if + // necessary) + public int put (MessageBlock mb) + { + return this.put (mb, null); + } + + // tv is an absolute time timeout + public int put (MessageBlock mb, TimeValue tv) + { + if (this.isWriter ()) + { + switch (mb.msgType ()) + { + case MessageType.MB_IOCTL: + return this.control (mb); + /* NOTREACHED */ + default: + break; + } + } + + return -1; + } + + public void dump () + { + } + + public int handleTimeout (TimeValue tv, Object obj) + { + return 0; + } + +} diff --git a/java/JACE/ASX/Task.java b/java/JACE/ASX/Task.java new file mode 100644 index 00000000000..b13de64f16a --- /dev/null +++ b/java/JACE/ASX/Task.java @@ -0,0 +1,443 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * Task.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +import JACE.OS.*; +import JACE.Reactor.*; +import JACE.Concurrency.*; + +/** + * Primary interface for application message processing, as well + * as input and output message queueing. <P> + * + * This class serves as the basis for passive and active objects + * in ACE. + * + *@see MessageQueue + *@see EventHandler + */ +public abstract class Task implements Runnable, EventHandler +{ + // = Initialization/termination methods. + + /** + * Initialize a Task. Note, we allocate a message queue ourselves. + */ + public Task () + { + this.msgQueue_ = new MessageQueue (); + this.thrMgr_ = null; + } + + /** + * Initialize a Task. Note, we use the message queue and thread + * manager supplied by the user. + *@param mq Message Queue to hold list of messages on the Task + *@param thrMgr Thread Manager that manages all the spawned threads + */ + public Task (MessageQueue mq, + ThreadManager thrMgr) + { + this.msgQueue_ = mq; + this.thrMgr_ = thrMgr; + } + + /** + * Not meant to be invoked by the user directly!. This needs to be + * in the public interface in order to get invoked by Thread + * class. + */ + public void run () + { + this.svc (); + } + + // = Initialization and termination hooks (note that these *must* be + // defined by subclasses). + + /** + * Hook called to open a Task. + *@param obj used to pass arbitrary information + */ + public abstract int open (Object obj); + + /** + * Hook called to close a Task. + */ + public abstract int close (long flags); + + // = Immediate and deferred processing methods, respectively. + + /** + * Transfer a message into the queue to handle immediate + * processing. + *@param mb Message Block to handle immediately + *@param tv Latest time to wait until (absolute time) + */ + public abstract int put (MessageBlock mb, TimeValue tv); + + /** + * Run by a daemon thread to handle deferred processing. Note, that + * to do anything useful, this method should be overriden by the + * subclass. + *@return default implementation always returns 0. + */ + public int svc () + { + return 0; + } + + /** + * Set the underlying Thread Manager. + *@param t Thread Manager to use + */ + public synchronized void thrMgr (ThreadManager t) + { + this.thrMgr_ = t; + } + + /** + * Get the Thread Manager. + *@return Underlying Thread Manager + */ + public synchronized ThreadManager thrMgr () + { + return this.thrMgr_; + } + + // = Active object method. + + /** + * Turn the task into an active object. That is, having <nThreads> + * separate threads of control that all invoke Task::svc. + *@param flags Task Flags + *@param nThreads number of threads to spawn + *@param forceActive whether to force creation of new threads or not + *@return -1 if failure occurs, 1 if Task is already an active + * object and <forceActive> is false (doesn't *not* create a new + * thread in this case), and 0 if Task was not already an active + * object and a thread is created successfully or thread is an active + * object and <forceActive> is true. + */ + public synchronized int activate (long flags, int nThreads, boolean forceActive) + { + // Create a Thread Manager if we do not already have one + if (this.thrMgr_ == null) + this.thrMgr_ = new ThreadManager (); + + if (this.thrCount () > 0 && forceActive == false) + return 1; // Already active. + this.flags_ = flags; + + if (ACE.BIT_ENABLED (flags, TaskFlags.THR_DAEMON)) + this.thrMgr_.spawnN (nThreads, this, true); // Spawn off all threads as daemon threads + else // Spawn off all threads as normal threads + this.thrMgr_.spawnN (nThreads, this, false); + + return 0; + } + + // = Suspend/resume a Task + + /** + * Suspend a task. Default implementation is a no-op. + */ + public synchronized void suspend () + { + } + + /** + * Resume a suspended task. Default implementation is a no-op. + */ + public synchronized void resume () + { + } + + /** + * Get the current group name. + *@return name of the current thread group + */ + public synchronized String grpName () + { + if (this.thrMgr_ != null) + return this.thrMgr_.thrGrp ().getName (); + else + return null; + } + + /** + * Get the message queue associated with this task. + *@return the message queue associated with this task. + */ + public MessageQueue msgQueue () + { + return this.msgQueue_; + } + + /** + * Set the message queue associated with this task. + *@param mq Message Queue to use with this Task. + */ + public void msgQueue (MessageQueue mq) + { + this.msgQueue_ = mq; + } + + /** + * Get the number of threads currently running within the Task. + *@return the number of threads currently running within the Task. + * 0 if we're a passive object, else > 0. + */ + public synchronized int thrCount () + { + if (this.thrMgr_ != null) + return this.thrMgr_.thrGrp ().activeCount (); + else + return 0; + } + + /** + * Set the Task flags + *@param flags Task Flags + */ + public synchronized void flags (long flags) + { + this.flags_ = flags; + } + + /** + * Get the Task flags + *@return Task Flags + */ + public synchronized long flags () + { + return this.flags_; + } + + // = Message queue manipulation methods. + + + /* + * Dump debug information. + */ + public void dump () + { + } + + /** + * Insert a message into the queue, blocking forever if necessary. + *@param mb Message Block to insert + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + protected int putq (MessageBlock mb) throws InterruptedException + { + return this.putq(mb, null); + } + + /** + * Insert message into the message queue. + *@param mb Message Block to insert into the Message Queue + *@param tv time to wait until (absolute time) + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + protected int putq (MessageBlock mb, TimeValue tv) throws InterruptedException + { + return this.msgQueue_.enqueueTail (mb, tv); + } + + /** + * Extract the first message from the queue, blocking forever if + * necessary. + *@return the first Message Block from the Message Queue. + *@exception InterrupteException Interrupted while accessing queue + */ + protected MessageBlock getq() throws InterruptedException + { + return this.getq(null); + } + + /** + * Extract the first message from the queue. Note that the call is blocking. + *@return the first Message Block from the Message Queue. + *@param tv Latest time to wait until (absolute time) + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + protected MessageBlock getq (TimeValue tv) throws InterruptedException + { + return this.msgQueue_.dequeueHead (tv); + } + + /** + * Return a message back to the queue. + *@param mb Message Block to return back to the Message Queue + *@param tv Latest time to wait until (absolute time) + *@exception java.lang.InterruptedException Interrupted while accessing queue + */ + protected int ungetq (MessageBlock mb, TimeValue tv) throws InterruptedException + { + return this.msgQueue_.enqueueHead (mb, tv); + } + + /** + * Transfer message to the adjacent ACETask in an ACEStream. + *@param mb Message Block to transfer to the adjacent Task + *@param tv Latest time to wait until (absolute time) + *@return -1 if there is no adjacent Task, else the return value of + * trying to put the Message Block on that Task's Message Queue. + */ + protected int putNext (MessageBlock mb, TimeValue tv) + { + return this.next_ == null ? -1 : this.next_.put (mb, tv); + } + + /** + * Turn the message back around. Puts the message in the sibling's + * Message Queue. + *@param mb Message Block to put into sibling's Message Queue + *@param tv Latest time to wait until (absolute time) + *@return -1 if there is no adjacent Task to the sibling, else the + * return value of trying to put the Message Block on sibling's + * Message Queue. + */ + protected int reply (MessageBlock mb, TimeValue tv) + { + return this.sibling ().putNext (mb, tv); + } + + // = ACE_Task utility routines to identify names et al. + + /** + * Get the name of the enclosing Module. + *@return the name of the enclosing Module if there's one associated + * with the Task, else null. + */ + protected String name () + { + if (this.mod_ == null) + return null; + else + return this.mod_.name (); + } + + /** + * Get the Task's sibling. + *@return the Task's sibling if there's one associated with the + * Task's Module, else null. + */ + protected Task sibling () + { + if (this.mod_ == null) + return null; + else + return this.mod_.sibling (this); + } + + /** + * Set the Task's module. + *@param mod the Task's Module. + */ + protected void module (Module mod) + { + this.mod_ = mod; + } + + /** + * Get the Task's module. + *@return the Task's Module if there is one, else null. + */ + protected Module module () + { + return this.mod_; + } + + /** + * Check if queue is a reader. + *@return true if queue is a reader, else false. + */ + protected boolean isReader () + { + return (ACE.BIT_ENABLED (this.flags_, TaskFlags.ACE_READER)); + } + + /** + * Check if queue is a writer. + *@return true if queue is a writer, else false. + */ + protected boolean isWriter () + { + return (ACE.BIT_DISABLED (this.flags_, TaskFlags.ACE_READER)); + } + + // = Pointers to next ACE_Queue (if ACE is part of an ACE_Stream). + + /** + * Get next Task pointer. + *@return pointer to the next Task + */ + protected Task next () + { + return this.next_; + } + + /** + * Set next Task pointer. + *@param task next task pointer + */ + protected void next (Task task) + { + this.next_ = task; + } + + // Special routines corresponding to certain message types. + + /** + * Flush the Message Queue + *@return 0 if Message Queue is null, 1 if flush succeeds, -1 if + * ACE_FLUSHALL bit is not enabled in flags. + */ + protected int flush (long flag) + { + if (ACE.BIT_ENABLED (flag, TaskFlags.ACE_FLUSHALL)) + return (this.msgQueue_ == null ? 0 : 1); + else + return -1; + } + + + /** + * Manipulate watermarks. + *@param cmd IOCntlCmd + *@param size watermark + */ + protected void waterMarks (int cmd, int size) + { + if (cmd == IOCntlCmds.SET_LWM) + this.msgQueue_.lowWaterMark (size); + else /* cmd == IOCntlMsg.SET_HWM */ + this.msgQueue_.highWaterMark (size); + } + + private ThreadManager thrMgr_ = null; + // Thread_Manager that manages all the spawned threads + + private long flags_; + // Task flags. + + private MessageQueue msgQueue_; + // List of messages on the Task.. + + private Task next_; + // Adjacent ACE_Task. + + private Module mod_; + // Back-pointer to the enclosing module. +} diff --git a/java/JACE/ASX/TaskFlags.java b/java/JACE/ASX/TaskFlags.java new file mode 100644 index 00000000000..13347283adf --- /dev/null +++ b/java/JACE/ASX/TaskFlags.java @@ -0,0 +1,49 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * TaskFlags.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +/** + * Flags used within Task. + * + *@see Task + */ +public abstract class TaskFlags +{ + /** Identifies a Task as being the "reader" in a Module. */ + public static final int ACE_READER = 01; + + /** Just flush data messages in the queue. */ + public static final int ACE_FLUSHDATA = 02; + + /** Flush all messages in the Queue. */ + public static final int ACE_FLUSHALL = 04; + + /** Flush read queue */ + public static final int ACE_FLUSHR = 010; + + /** Flush write queue */ + public static final int ACE_FLUSHW = 020; + + /** Flush both queues */ + public static final int ACE_FLUSHRW = 030; + + /** Identifies a thread as suspended */ + public static final int THR_SUSPENDED = 0x00000080; + + /** Identifies a thread as a daemon thread */ + public static final int THR_DAEMON = 0x00000100; + + // Default private constructor to avoid instantiation + private TaskFlags () + { + } +} diff --git a/java/JACE/ASX/ThruTask.java b/java/JACE/ASX/ThruTask.java new file mode 100644 index 00000000000..3fd0bbd4476 --- /dev/null +++ b/java/JACE/ASX/ThruTask.java @@ -0,0 +1,44 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * ThruTask.java + * + *@author Prashant Jain + * + *************************************************/ +package JACE.ASX; + +/** + * Standard module that acts as a "no op", simply passing on all + * data to its adjacent neighbor. + */ +public class ThruTask extends Task +{ + public int open (Object obj) + { + return 0; + } + + public int close (long flags) + { + return 0; + } + + public int put (MessageBlock msg, TimeValue tv) + { + return this.putNext (msg, tv); + } + + public int svc () + { + return -1; + } + + public int handleTimeout (TimeValue tv, Object obj) + { + return 0; + } +} diff --git a/java/JACE/ASX/TimeValue.java b/java/JACE/ASX/TimeValue.java new file mode 100644 index 00000000000..452f80447c4 --- /dev/null +++ b/java/JACE/ASX/TimeValue.java @@ -0,0 +1,296 @@ +/************************************************* + * + * = PACKAGE + * JACE.Reactor + * + * = FILENAME + * TimeValue.java + * + *@author Prashant Jain + * + *************************************************/ +//package JACE.Reactor; +package JACE.ASX; + +/** + * Encapsulates a specific time or time interval. <P> + * + * Also provides methods for generating absolute times from + * relative times. This is used throughout JACE for timeouts. + * + *@see TimedWait + */ +public class TimeValue +{ + /** + * TimeValue representing 0 seconds and 0 nanoseconds. + */ + public final static TimeValue zero = new TimeValue (0,0); + + /** + * Default constructor. This creates a TimeValue that is + * equal to TimeValue.zero. + */ + public TimeValue () + { + this (0, 0); + } + + /** + * Constructor + *@param sec seconds + */ + public TimeValue (long sec) + { + this (sec, 0); + } + + /** + * Constructor + *@param sec seconds + *@param nanos nanoseconds + */ + public TimeValue (long sec, int nanos) + { + this.set (sec, nanos); + } + + /** + * Sets the seconds and nanoseconds of Time Value + *@param sec seconds + *@param nanos nanoseconds + */ + public void set (long sec, int nanos) + { + this.millisec_ = sec * 1000; + this.nanos_ = nanos; + this.normalize (); + } + + /** + * Get seconds + *@return Seconds + */ + public long sec () + { + return this.millisec_/1000; + } + + /** + * Get nanoseconds + *@return Nanoseconds + */ + public int nanos () + { + return this.nanos_; + } + + /** + * Get time in milliseconds. + *@return time in milliseconds + */ + public long getMilliTime () + { + return this.millisec_; + } + + /** + * Get a String representation of the Time Value. + *@return String representation of the Time Value + */ + public String toString () + { + return (new Long (this.millisec_/1000)).toString () + ":" + + (new Integer (this.nanos_)).toString (); + } + + /** + * Get current time. + *@return the current system time as a new TimeValue + */ + public static TimeValue getTimeOfDay () + { + return new TimeValue (System.currentTimeMillis ()/1000); + } + + /** + * Return a new TimeValue that represents the current system time + * of day offset by the given number of seconds and nanoseconds. + *@param sec Number of seconds to offset by + *@param nanos Number of nanoseconds to offset by + *@see JACE.ASX.TimeValue + *@return TimeValue for the system time plus the given offset + */ + public static TimeValue relativeTimeOfDay(long sec, int nanos) + { + return new TimeValue ((System.currentTimeMillis() / 1000) + sec, + nanos); + } + + /** + * Return a new TimeValue that represents the current system time + * of day offset by the given TimeValue. + *@param tv TimeValue to offset by + *@see JACE.ASX.TimeValue + *@return TimeValue for the system time plus the given offset + */ + public static TimeValue relativeTimeOfDay(TimeValue offset) + { + return new TimeValue ((System.currentTimeMillis() / 1000) + + offset.sec(), + offset.nanos()); + } + + /** + * Compare two Time Values for equality. + *@param tv Time Value to compare with + *@return true if the two Time Values are equal, false otherwise + */ + public boolean equals (TimeValue tv) + { + return this.millisec_ == (tv.sec () * 1000) && this.nanos_ == tv.nanos (); + } + + /** + * Compare two Time Values for non-equality. + *@param tv Time Value to compare with + *@return true if the two Time Values are not equal, false otherwise + */ + public boolean notEquals (TimeValue tv) + { + return !this.equals (tv); + } + + /** + * Add two Time Values. + *@param tv1 The first Time Value + *@param tv2 The second Time Value + *@return sum of the two Time Values. + */ + public static TimeValue plus (TimeValue tv1, TimeValue tv2) + { + TimeValue tv = new TimeValue (tv1.sec () + tv2.sec (), + tv1.nanos () + tv2.nanos ()); + tv.normalize (); + return tv; + } + + /** + * Subtract two Time Values. + *@param tv1 The first Time Value + *@param tv2 The second Time Value + *@return difference of the two Time Values. + */ + public static TimeValue minus (TimeValue tv1, TimeValue tv2) + { + TimeValue tv = new TimeValue (tv1.sec () - tv2.sec (), + tv1.nanos () - tv2.nanos ()); + tv.normalize (); + return tv; + } + + /** + * Add Time Value to "this". + *@param tv The Time Value to add to this. + */ + public void plusEquals (TimeValue tv) + { + this.set (this.sec () + tv.sec (), + this.nanos () + tv.nanos ()); + this.normalize (); + } + + /** + * Subtract Time Value from "this". + *@param tv The Time Value to subtract from this. + */ + public void minusEquals (TimeValue tv) + { + this.set (this.sec () - tv.sec (), + this.nanos () - tv.nanos ()); + this.normalize (); + } + + /** + * Compare two Time Values for less than. + *@param tv Time Value to compare with + *@return true if "this" is less than tv, false otherwise + */ + public boolean lessThan (TimeValue tv) + { + return tv.greaterThan (this); + } + + /** + * Compare two Time Values for greater than. + *@param tv Time Value to compare with + *@return true if "this" is greater than tv, false otherwise + */ + public boolean greaterThan (TimeValue tv) + { + if (this.sec () > tv.sec ()) + return true; + else if (this.sec () == tv.sec () + && this.nanos () > tv.nanos ()) + return true; + else + return false; + } + + /** + * Compare two Time Values for <=. + *@param tv Time Value to compare with + *@return true if "this" <= tv, false otherwise + */ + public boolean lessThanEqual (TimeValue tv) + { + return tv.greaterThanEqual (this); + } + + /** + * Compare two Time Values for >=. + *@param tv Time Value to compare with + *@return true if "this" >= tv, false otherwise + */ + public boolean greaterThanEqual (TimeValue tv) + { + return this.sec () >= tv.sec () && this.nanos () >= tv.nanos (); + } + + private void normalize () + { + if (this.nanos_ >= ONE_MILLISECOND) + { + do + { + this.millisec_++; + this.nanos_ -= ONE_MILLISECOND; + } + while (this.nanos_ >= ONE_MILLISECOND); + } + else if (this.nanos_ <= -ONE_MILLISECOND) + { + do + { + this.millisec_--; + this.nanos_ += ONE_MILLISECOND; + } + while (this.nanos_ <= -ONE_MILLISECOND); + } + + if (this.millisec_ >= 1 && this.nanos_ < 0) + { + this.millisec_--; + this.nanos_ += ONE_MILLISECOND; + } + else if (this.millisec_ < 0 && this.nanos_ > 0) + { + this.millisec_++; + this.nanos_ -= ONE_MILLISECOND; + } + } + + private long millisec_; + private int nanos_; + private final static int ONE_MILLISECOND = 1000000; +} diff --git a/java/JACE/ASX/TimedWait.java b/java/JACE/ASX/TimedWait.java new file mode 100644 index 00000000000..dc1d0bab673 --- /dev/null +++ b/java/JACE/ASX/TimedWait.java @@ -0,0 +1,157 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * TimedWait.java + * + *@author Prashant Jain and Doug Schmidt + * + *************************************************/ +package JACE.ASX; + +/** + * A wait/notify system with absolute time timeouts and built-in + * check of a condition. <P> + * + * Subclasses define the condition to check, and the object to + * wait on can be specified. + */ +public abstract class TimedWait +{ + /** + * Default Constructor. Sets "this" to be used for the delegation of + * the wait() call to. + */ + public TimedWait () + { + object_ = this; + } + + /** + * Constructor. Allows subclasses to supply us with an Object that + * is delegated the wait() call. + *@param obj The Object that is delegated the wait() call. + */ + public TimedWait (Object obj) + { + object_ = obj; + } + + /** + * Hook method that needs to be implemented by subclasses. + */ + public abstract boolean condition (); + + /** + * Wait until condition becomes true. Note that the method + * blocks. Also note that this method is final to ensure that no one + * overrides it. + * IMPORTANT: This method assumes it is called with the object_'s + * monitor lock already held. + *@exception InterruptedException Interrupted during wait + */ + public final void timedWait () throws InterruptedException + { + // Acquire the monitor lock. + if (!condition ()) + { + // Only attempt to perform the wait if the condition isn't + // true initially. + for (;;) + { + // Wait until we are notified. + object_.wait (); + + // Recheck the condition. + if (condition ()) + break; // Condition became true. + + // else we were falsely notified so go back into wait + } + } + } + + /** + * Template Method that implements the actual timed wait. Note that + * this method is final to ensure that no one overrides it. + * IMPORTANT: This method assumes it is called with the object_'s + * monitor lock already held. + * If the specified wait time is zero, this checks the condition, + * then returns on success or throws a TimeoutException on failure. + *@param tv Absolute time to wait until before throwing an exception + * if the condition isn't satisfied + *@exception java.lang.InterruptedException Interrupted during wait + *@exception JACE.ASX.TimeoutException Reached timeout specified + */ + public final void timedWait (TimeValue tv) + throws InterruptedException, + TimeoutException + { + if (tv == null) { + this.timedWait(); + return; + } + + // Acquire the monitor lock. + if (!condition ()) + { + long start = System.currentTimeMillis(); + long waitTime = tv.getMilliTime() - start; + + for (;;) { + + // Prevent a conversion from absolute to relative time from + // generating a zero or negative waitTime. + if (waitTime < 1) + throw new TimeoutException (); + + // Wait until we are notified. + object_.wait (waitTime); + + // Recheck the condition. + if (!condition ()) { + + long now = System.currentTimeMillis(); + + // Timed out! + if (now >= tv.getMilliTime ()) + throw new TimeoutException (); + else + // We still have some time left to wait, so adjust the + // wait_time. + waitTime = tv.getMilliTime() - now; + } + else + break; // Condition became true. + } + } + } + + /** + * Notify any one thread waiting on the object_. + * IMPORTANT: This method assumes it is called with the object_'s + * monitor lock already held. + */ + public final void signal () { + object_.notify (); + } + + /** + * Notify all threads waiting on the object_. + * IMPORTANT: This method assumes it is called with the object_'s + * monitor lock already held. + */ + public final void broadcast () { + object_.notifyAll (); + } + + /** + * The object we delegate to. If a subclass gives us a particular + * object, we use that to delegate to, otherwise, we ``delegate'' + * to ourself (i.e., this). + */ + protected Object object_; + +} diff --git a/java/JACE/ASX/TimeoutException.java b/java/JACE/ASX/TimeoutException.java new file mode 100644 index 00000000000..d55cc4fe999 --- /dev/null +++ b/java/JACE/ASX/TimeoutException.java @@ -0,0 +1,37 @@ +/************************************************* + * + * = PACKAGE + * JACE.ASX + * + * = FILENAME + * TimeoutException.java + * + *@author Prashant Jain and Doug Schmidt + * + *************************************************/ +package JACE.ASX; + +/** + * Thrown when a timer has expired. + */ +public class TimeoutException extends Exception +{ + /** + * Default Constructor. + */ + public TimeoutException () + { + super ("Timed Out"); + } + + /** + * Constructor. + *@param timeout The timeout value which expired. + *@param desc Textual description of the exception + */ + public TimeoutException (TimeValue timeout, String desc) + { + super ("Timed Out in " + timeout + ": " + desc); + } + +} diff --git a/java/JACE/ASX/package.html b/java/JACE/ASX/package.html new file mode 100644 index 00000000000..346782ed083 --- /dev/null +++ b/java/JACE/ASX/package.html @@ -0,0 +1,11 @@ +<!-- $Id$ --> +<HTML> +<BODY> +Message queueing facilities. +<P> +@see <a href="http://www.cs.wustl.edu/~schmidt/ACE-papers.html#ipc"> +Documents on ACE interprocess communication components</a> +@see <a href="http://www.cs.wustl.edu/~schmidt/ACE-papers.html#streams"> +Documents on the ACE streams framework</a> +</BODY> +</HTML> |