summaryrefslogtreecommitdiff
path: root/java/JACE/ASX/StreamHead.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/JACE/ASX/StreamHead.java')
-rw-r--r--java/JACE/ASX/StreamHead.java123
1 files changed, 123 insertions, 0 deletions
diff --git a/java/JACE/ASX/StreamHead.java b/java/JACE/ASX/StreamHead.java
new file mode 100644
index 00000000000..1492b43a297
--- /dev/null
+++ b/java/JACE/ASX/StreamHead.java
@@ -0,0 +1,123 @@
+/*************************************************
+ *
+ * = PACKAGE
+ * JACE.ASX
+ *
+ * = FILENAME
+ * StreamHead.java
+ *
+ *@author Prashant Jain
+ *
+ *************************************************/
+package JACE.ASX;
+
+import JACE.OS.*;
+
+/**
+ * Standard module that acts as the head of a ustream.
+ */
+
+public class StreamHead extends Task
+{
+ // Module that acts as the head of a Stream.
+
+ public int open (Object obj)
+ {
+ return 0;
+ }
+
+ public int close (long l)
+ {
+ return 0;
+ }
+
+ public int svc ()
+ {
+ return -1;
+ }
+
+ private int control (MessageBlock mb)
+ {
+
+ IOCntlMsg ioc = (IOCntlMsg) mb.obj ();
+ int cmd = ioc.cmd ();
+
+ switch (cmd)
+ {
+ case IOCntlCmds.SET_LWM:
+ case IOCntlCmds.SET_HWM:
+ this.waterMarks (cmd, mb.cont ().length ());
+ ioc.rval (0);
+ break;
+ default:
+ return 0;
+ }
+ return ioc.rval ();
+ }
+
+ /* Performs canonical flushing at the ACE_Stream Head */
+
+ private int canonicalFlush (MessageBlock mb)
+ {
+ String s = mb.base ();
+ long f = (new Long (s)).longValue ();
+
+ if ((f & TaskFlags.ACE_FLUSHR) != 0)
+ {
+ this.flush (TaskFlags.ACE_FLUSHALL);
+ f &= ~TaskFlags.ACE_FLUSHR;
+ }
+ if ((f & TaskFlags.ACE_FLUSHW) != 0)
+ return this.reply (mb, null);
+ return 0;
+ }
+
+ // Will block forever to add the given MessageBlock
+ public int put (MessageBlock mb)
+ {
+ return this.put (mb, null);
+ }
+
+ // tv is absolute time
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ int res = 0;
+ if (mb.msgType () == MessageType.MB_IOCTL
+ && (res = this.control (mb)) == -1)
+ return res;
+
+ if (this.isWriter ())
+ {
+ return this.putNext (mb, tv);
+ }
+ else /* this.isReader () */
+ {
+ switch (mb.msgType ())
+ {
+ case MessageType.MB_FLUSH:
+ return this.canonicalFlush (mb);
+ default:
+ break;
+ }
+
+ try
+ {
+ return this.putq (mb, tv);
+ }
+ catch (InterruptedException e)
+ {
+ return -1;
+ }
+ }
+ }
+
+ public void dump ()
+ {
+ }
+
+ public int handleTimeout (TimeValue tv, Object obj)
+ {
+ return 0;
+ }
+
+}