summaryrefslogtreecommitdiff
path: root/java/tests/ASX/BufferStreamTest.java
blob: c61f94f281ee0c34bd5d03f55ddb4ab5de6f6a13 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// ============================================================================
//
// = PACKAGE
//    tests.ASX
// 
// = FILENAME
//    BufferStreamTest.java
//
// = AUTHOR
//    Prashant Jain
// 
// ============================================================================
package tests.ASX;

import java.io.*;
import JACE.OS.*;
import JACE.ASX.*;

// This short program copies stdin to stdout via the use of an ASX
// STREAM.  It illustrates an implementation of the classic "bounded
// buffer" program using an ASX STREAM containing two Modules.  Each
// Module contains two Tasks.  

class CommonTask extends Task
{
  // ACE_Task hooks
  public int open (Object obj)
  {
    if (this.activate (0, 1, false) == -1)
      ACE.ERROR ("spawn");
    return 0;
  }

  public int close (long exitStatus)
  {
    ACE.DEBUG (Thread.currentThread () + " thread is exiting with status "  +
	       exitStatus + " in module " + this.name () + "\n");
    return 0;
  }

  public int put (MessageBlock mb, TimeValue tv) 
  { 
    return 0; 
  }

  public int handleTimeout (TimeValue tv, Object obj)
  {
    return 0;
  }
}

// Define the Producer interface. 

class Producer extends CommonTask
{
  // Read data from stdin and pass to consumer.
  // The Consumer reads data from the stdin stream, creates a message,
  // and then queues the message in the message list, where it is
  // removed by the consumer thread.  A 0-sized message is enqueued when
  // there is no more data to read.  The consumer uses this as a flag to
  // know when to exit.

  public int svc ()
  {
    // Keep reading stdin, until we reach EOF. 

      BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); 

      String msg = null;
      try
	{
	  while (true)
	    {
	      System.out.print ("Enter input: ");
	      System.out.flush ();
	      msg = in.readLine ();
	      if (msg == null)
		{
		  // Send a shutdown message to the other thread and exit.
		  if (this.putNext (new MessageBlock (0), new TimeValue ()) == -1)
		    ACE.ERROR ("putNext");
		  break;
		}
	      else
		{
		  // Send the message to the other thread.
		  if (this.putNext (new MessageBlock (msg), new TimeValue ()) == -1)
		    ACE.ERROR ("putNext");
		}
	    }
	}
      catch (IOException e)
	{
	}
      return 0;
  }
}

class Consumer extends CommonTask
  // = TITLE
  //    Define the Consumer interface. 
{
  // Enqueue the message on the MessageQueue for subsequent
  // handling in the svc() method.
  public int put (MessageBlock mb, TimeValue tv)
    { 
      try
	{
	  return this.putq (mb, tv);
	}
      catch (InterruptedException e)
	{
	}
      return 0;
    }

  // The consumer dequeues a message from the ACE_Message_Queue, writes
  // the message to the stderr stream, and deletes the message.  The
  // Consumer sends a 0-sized message to inform the consumer to stop
  // reading and exit.

  public int svc ()
    {
      MessageBlock mb = null;

      // Keep looping, reading a message out of the queue, until we
      // timeout or get a message with a length == 0, which signals us to
      // quit.
      try
	{
	  while (true)
	    {
	      // Wait for upto 4 seconds
	      mb = this.getq (new TimeValue (4));
	  
	      if (mb == null)
		break;
	  
	      int length = mb.length ();
	  
	      if (length > 0)
		System.out.println ("\n" + mb.base ());

	      if (length == 0)
		break;
	    }
	}
      catch (InterruptedException e)
	{
	}
      if (mb == null)
	{
	  ACE.ERROR ("timed out waiting for message");
	  System.exit (1);
	}
      return 0;
    }
}

// Spawn off a new thread.

public class BufferStreamTest
{
  public static void main (String args[])
  {
    // Control hierachically-related active objects
    Stream stream = new Stream ();
    Module pm = new Module ("Consumer", new Consumer (), null, null);
    Module cm = new Module ("Producer", new Producer (), null, null);

    // Create Producer and Consumer Modules and push them onto the
    // STREAM.  All processing is performed in the STREAM.

    if (stream.push (pm) == -1)
      {
	ACE.ERROR ("push");
	return;
      }
    else if (stream.push (cm) == -1)
      {
	ACE.ERROR ("push");
	return;
      }
  }
}