/************************************************* * * = PACKAGE * JACE.ASX * * = FILENAME * Stream.java * *@author Prashant Jain * *************************************************/ package JACE.ASX; import JACE.OS.*; /** *
*

SYNOPSIS

*
* This class is the primary abstraction for the ASX framework. * It is moduled after System V Stream. *
* *

DESCRIPTION

*
* A Stream consists of a stack of , each of which * contains two . *
*/ public class Stream { public Stream () { this (null, null, null); } // Create a Stream consisting of and as the Stream // head and Stream tail, respectively. If these are 0 then the // and are used, respectively. // is the value past in to the open() methods of the tasks. public Stream (Object a, Module head, Module tail) { this.linkedUs_ = null; // this.final_close_ = this.lock_; if (this.open (a, head, tail) == -1) ACE.ERROR ("open" + head.name () + " " + tail.name ()); } public int push (Module newTop) { if (this.pushModule (newTop, this.streamHead_.next (), this.streamHead_) == -1) return -1; else return 0; } public int put (MessageBlock mb, TimeValue tv) { return this.streamHead_.writer ().put (mb, tv); } public MessageBlock get (TimeValue tv) throws InterruptedException { return this.streamHead_.reader ().getq (tv); } // Return the "top" ACE_Module in a ACE_Stream, skipping over the // stream_head. public Module top () { if (this.streamHead_.next () == this.streamTail_) return null; else return this.streamHead_.next (); } // Remove the "top" ACE_Module in a ACE_Stream, skipping over the // stream_head. public int pop (long flags) { if (this.streamHead_.next () == this.streamTail_) return -1; else { // Skip over the ACE_Stream head. Module top = this.streamHead_.next (); Module newTop = top.next (); this.streamHead_.next (newTop); // Close the top ACE_Module. top.close (flags); this.streamHead_.writer ().next (newTop.writer ()); newTop.reader ().next (this.streamHead_.reader ()); return 0; } } // Remove a named ACE_Module from an arbitrary place in the // ACE_Stream. public int remove (String name, long flags) { Module prev = null; for (Module mod = this.streamHead_; mod != null; mod = mod.next ()) if (name.compareTo (mod.name ()) == 0) { if (prev == null) // Deleting ACE_Stream Head this.streamHead_.link (mod.next ()); else prev.link (mod.next ()); mod.close (flags); return 0; } else prev = mod; return -1; } public Module find (String name) { for (Module mod = this.streamHead_; mod != null; mod = mod.next ()) if (name.compareTo (mod.name ()) == 0) return mod; return null; } // Actually push a module onto the stack... private int pushModule (Module newTop, Module currentTop, Module head) { Task ntReader = newTop.reader (); Task ntWriter = newTop.writer (); Task ctReader = null; Task ctWriter = null; if (currentTop != null) { ctReader = currentTop.reader (); ctWriter = currentTop.writer (); ctReader.next (ntReader); } ntWriter.next (ctWriter); if (head != null) { if (head != newTop) head.link (newTop); } else ntReader.next (null); newTop.next (currentTop); if (ntReader.open (newTop.arg ()) == -1) return -1; if (ntWriter.open (newTop.arg ()) == -1) return -1; return 0; } public synchronized int open (Object a, Module head, Module tail) { Task h1 = null, h2 = null; Task t1 = null, t2 = null; if (head == null) { h1 = new StreamHead (); h2 = new StreamHead (); head = new Module ("ACEStreamHead", h1, h2, a); } if (tail == null) { t1 = new StreamTail (); t2 = new StreamTail (); tail = new Module ("ACEStreamTail", t1, t2, a); } // Make sure *all* the allocation succeeded! if (h1 == null || h2 == null || head == null || t1 == null || t2 == null || tail == null) { // Close up! head.close (0); tail.close (0); return -1; } this.streamHead_ = head; this.streamTail_ = tail; if (this.pushModule (this.streamTail_, null, null) == -1) return -1; else if (this.pushModule (this.streamHead_, this.streamTail_, this.streamHead_) == -1) return -1; else return 0; } public synchronized int close (long flags) { if (this.streamHead_ != null && this.streamTail_ != null) { // Don't bother checking return value here. this.unlinkInternal (); int result = 0; // Remove and cleanup all the intermediate modules. while (this.streamHead_.next () != this.streamTail_) { if (this.pop (flags) == -1) result = -1; } // Clean up the head and tail of the stream. if (this.streamHead_.close (flags) == -1) result = -1; if (this.streamTail_.close (flags) == -1) result = -1; this.streamHead_ = null; this.streamTail_ = null; // Tell all threads waiting on the close that we are done. // this.final_close_.broadcast (); return result; } return 0; } public int control (int cmd, Object a) throws InterruptedException { IOCntlMsg ioc = new IOCntlMsg (cmd); // Create a data block that contains the user-supplied data. MessageBlock db = new MessageBlock (MessageType.MB_IOCTL, null, a); // Create a control block that contains the control field and a // pointer to the data block. MessageBlock cb = new MessageBlock (MessageType.MB_IOCTL, db, (Object) ioc); int result = 0; if (this.streamHead_.writer ().put (cb, new TimeValue ()) == -1) result = -1; else if ((cb = this.streamHead_.reader ().getq (new TimeValue ())) == null) result = -1; else result = ((IOCntlMsg ) cb.obj ()).rval (); return result; } // Link two streams together at their bottom-most Modules (i.e., the // one just above the Stream tail). Note that all of this is premised // on the fact that the Stream head and Stream tail are non-NULL... // This must be called with locks held. private int linkInternal (Stream us) { this.linkedUs_ = us; // Make sure the other side is also linked to us! us.linkedUs_ = this; Module myTail = this.streamHead_; if (myTail == null) return -1; // Locate the module just above our Stream tail. while (myTail.next () != this.streamTail_) myTail = myTail.next (); Module otherTail = us.streamHead_; if (otherTail == null) return -1; // Locate the module just above the other Stream's tail. while (otherTail.next () != us.streamTail_) otherTail = otherTail.next (); // Reattach the pointers so that the two streams are linked! myTail.writer ().next (otherTail.reader ()); otherTail.writer ().next (myTail.reader ()); return 0; } public synchronized int link (Stream us) { return this.linkInternal (us); } // Must be called with locks held... private int unlinkInternal () { // Only try to unlink if we are in fact still linked! if (this.linkedUs_ != null) { Module myTail = this.streamHead_; // Only relink if we still exist! if (myTail != null) { // Find the module that's just before our stream tail. while (myTail.next () != this.streamTail_) myTail = myTail.next (); // Restore the writer's next() link to our tail. myTail.writer ().next (this.streamTail_.writer ()); } Module otherTail = this.linkedUs_.streamHead_; // Only fiddle with the other side if it in fact still remains. if (otherTail != null) { while (otherTail.next () != this.linkedUs_.streamTail_) otherTail = otherTail.next (); otherTail.writer ().next (this.linkedUs_.streamTail_.writer ()); } // Make sure the other side is also aware that it's been unlinked! this.linkedUs_.linkedUs_ = null; this.linkedUs_ = null; return 0; } else return -1; } public synchronized int unlink () { return this.unlinkInternal (); } public void dump () { ACE.DEBUG ("-------- module links --------"); for (Module mp = this.streamHead_; ; mp = mp.next ()) { ACE.DEBUG ("module name = " + mp.name ()); if (mp == this.streamTail_) break; } ACE.DEBUG ("-------- writer links --------"); Task tp; for (tp = this.streamHead_.writer (); ; tp = tp.next ()) { ACE.DEBUG ("writer queue name = " + tp.name ()); tp.dump (); ACE.DEBUG ("-------\n"); if (tp == this.streamTail_.writer () || (this.linkedUs_ != null && tp == this.linkedUs_.streamHead_.reader ())) break; } ACE.DEBUG ("-------- reader links --------\n"); for (tp = this.streamTail_.reader (); ; tp = tp.next ()) { ACE.DEBUG ("reader queue name = " + tp.name ()); tp.dump (); ACE.DEBUG ("-------\n"); if (tp == this.streamHead_.reader () || (this.linkedUs_ != null && tp == this.linkedUs_.streamHead_.writer ())) break; } } Module streamHead_ = null; // Pointer to the head of the stream. Module streamTail_ = null; // Pointer to the tail of the stream. Stream linkedUs_ = null; // Pointer to an adjoining linked stream. // = Synchronization objects used for thread-safe streams. // ACE_SYNCH_MUTEX lock_; // Protect the stream against race conditions. // ACE_SYNCH_CONDITION final_close_; // Use to tell all threads waiting on the close that we are done. }