summaryrefslogtreecommitdiff
path: root/java/src/StreamHead.java
blob: 37d9c2af0c31cb8f0784f5d06092711049a5405f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/*************************************************
 *
 * = PACKAGE
 *    JACE.ASX
 *
 * = FILENAME
 *    StreamHead.java
 *
 *@author Prashant Jain
 *
 *************************************************/
package JACE.ASX;

import JACE.OS.*;

/**
 * <hr>
 * <h2>SYNOPSIS</h2>
 *<blockquote>
 *     Standard module that acts as the head of a ustream.
 *</blockquote>
 */

public class StreamHead extends Task
{
  // Module that acts as the head of a Stream.

  public int open (Object obj)
  {
    return 0;
  }

  public int close (long l)
  {
    return 0;
  }

  public int svc ()
  {
    return -1;
  }

  private int control (MessageBlock mb)
  {
    
    IOCntlMsg ioc = (IOCntlMsg) mb.obj ();
    int cmd = ioc.cmd ();

    switch (cmd)
      {
      case IOCntlCmds.SET_LWM:
      case IOCntlCmds.SET_HWM:
	this.waterMarks (cmd, mb.cont ().length ());
	ioc.rval (0);
      break;
    default:
      return 0;
    }
  return ioc.rval ();
  }

  /* Performs canonical flushing at the ACE_Stream Head */

  private int canonicalFlush (MessageBlock mb)
  {
    String s = mb.base ();
    long f = (new Long (s)).longValue ();

    if ((f & TaskFlags.ACE_FLUSHR) != 0)
      {
	this.flush (TaskFlags.ACE_FLUSHALL);
	f &= ~TaskFlags.ACE_FLUSHR;
      }
    if ((f & TaskFlags.ACE_FLUSHW) != 0)
      return this.reply (mb, new TimeValue ());
    return 0;
  }

  public int put (MessageBlock mb, TimeValue tv)
  {
    int res = 0;
    if (mb.msgType () == MessageType.MB_IOCTL
	&& (res = this.control (mb)) == -1)
      return res;
    
    if (this.isWriter ())
      {
	return this.putNext (mb, tv);
      }
    else /* this.isReader () */
      {      
	switch (mb.msgType ())
	  {
	  case MessageType.MB_FLUSH:
	    return this.canonicalFlush (mb);
	  default:
	    break;
	  }
      
	try
	  {
	    return this.putq (mb, tv);
	  }
	catch (InterruptedException e)
	  {
	    return -1;
	  }
      }
  }

  public void dump ()
  {
  }

  public int handleTimeout (TimeValue tv, Object obj)
  {
    return 0;
  }

}