summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpjain <pjain@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1997-03-11 16:56:39 +0000
committerpjain <pjain@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1997-03-11 16:56:39 +0000
commit5a1460023f526395948809346f81bbe8706a60a3 (patch)
tree1b0525dcd6311273dadbe33411a159378661d659
parentff38fa58494d4fe4d57ee0a7c3d77445a28716a4 (diff)
downloadATCD-5a1460023f526395948809346f81bbe8706a60a3.tar.gz
Added a new test that tests out the functionality of the Java version
of ACE Stream and Module.
-rw-r--r--java/tests/ASX/BufferStreamTest.java184
-rw-r--r--java/tests/ASX/Makefile3
2 files changed, 186 insertions, 1 deletions
diff --git a/java/tests/ASX/BufferStreamTest.java b/java/tests/ASX/BufferStreamTest.java
new file mode 100644
index 00000000000..b431f78318f
--- /dev/null
+++ b/java/tests/ASX/BufferStreamTest.java
@@ -0,0 +1,184 @@
+// ============================================================================
+//
+// = PACKAGE
+// tests.ASX
+//
+// = FILENAME
+// BufferStreamTest.java
+//
+// = AUTHOR
+// Prashant Jain
+//
+// ============================================================================
+package tests.ASX;
+
+import java.io.*;
+import ACE.OS.*;
+import ACE.ASX.*;
+
+// This short program copies stdin to stdout via the use of an ASX
+// STREAM. It illustrates an implementation of the classic "bounded
+// buffer" program using an ASX STREAM containing two Modules. Each
+// Module contains two Tasks.
+
+class CommonTask extends Task
+{
+ // ACE_Task hooks
+ public int open (Object obj)
+ {
+ if (this.activate (0, 1, false) == -1)
+ ACE.ERROR ("spawn");
+ return 0;
+ }
+
+ public int close (long exitStatus)
+ {
+ ACE.DEBUG (Thread.currentThread () + " thread is exiting with status " +
+ exitStatus + " in module " + this.name () + "\n");
+ return 0;
+ }
+
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ return 0;
+ }
+
+ public int handleTimeout (TimeValue tv, Object obj)
+ {
+ return 0;
+ }
+}
+
+// Define the Producer interface.
+
+class Producer extends CommonTask
+{
+ // Read data from stdin and pass to consumer.
+ // The Consumer reads data from the stdin stream, creates a message,
+ // and then queues the message in the message list, where it is
+ // removed by the consumer thread. A 0-sized message is enqueued when
+ // there is no more data to read. The consumer uses this as a flag to
+ // know when to exit.
+
+ public int svc ()
+ {
+ // Keep reading stdin, until we reach EOF.
+
+ DataInputStream in = new DataInputStream (System.in);
+ String msg = null;
+ try
+ {
+ while (true)
+ {
+ System.out.print ("Enter input: ");
+ System.out.flush ();
+ msg = in.readLine ();
+ if (msg == null)
+ {
+ // Send a shutdown message to the other thread and exit.
+ if (this.putNext (new MessageBlock (0), new TimeValue ()) == -1)
+ ACE.ERROR ("putNext");
+ break;
+ }
+ else
+ {
+ // Send the message to the other thread.
+ if (this.putNext (new MessageBlock (msg), new TimeValue ()) == -1)
+ ACE.ERROR ("putNext");
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ }
+ return 0;
+ }
+}
+
+class Consumer extends CommonTask
+ // = TITLE
+ // Define the Consumer interface.
+{
+ // Enqueue the message on the MessageQueue for subsequent
+ // handling in the svc() method.
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ try
+ {
+ return this.putq (mb, tv);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ return 0;
+ }
+
+ // The consumer dequeues a message from the ACE_Message_Queue, writes
+ // the message to the stderr stream, and deletes the message. The
+ // Consumer sends a 0-sized message to inform the consumer to stop
+ // reading and exit.
+
+ public int svc ()
+ {
+ MessageBlock mb = null;
+
+ // Keep looping, reading a message out of the queue, until we
+ // timeout or get a message with a length == 0, which signals us to
+ // quit.
+ try
+ {
+ while (true)
+ {
+ // Wait for upto 4 seconds
+ mb = this.getq (new TimeValue (4));
+
+ if (mb == null)
+ break;
+
+ int length = mb.length ();
+
+ if (length > 0)
+ System.out.println ("\n" + mb.base ());
+
+ if (length == 0)
+ break;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ if (mb == null)
+ {
+ ACE.ERROR ("timed out waiting for message");
+ System.exit (1);
+ }
+ return 0;
+ }
+}
+
+// Spawn off a new thread.
+
+public class BufferStreamTest
+{
+ public static void main (String args[])
+ {
+ // Control hierachically-related active objects
+ Stream stream = new Stream ();
+ Module pm = new Module ("Consumer", new Consumer (), null, null);
+ Module cm = new Module ("Producer", new Producer (), null, null);
+
+ // Create Producer and Consumer Modules and push them onto the
+ // STREAM. All processing is performed in the STREAM.
+
+ if (stream.push (pm) == -1)
+ {
+ ACE.ERROR ("push");
+ return;
+ }
+ else if (stream.push (cm) == -1)
+ {
+ ACE.ERROR ("push");
+ return;
+ }
+ }
+}
diff --git a/java/tests/ASX/Makefile b/java/tests/ASX/Makefile
index ae62fad49a2..d097cbb0850 100644
--- a/java/tests/ASX/Makefile
+++ b/java/tests/ASX/Makefile
@@ -16,7 +16,8 @@ doc:
files = MessageQueueTest.java \
TaskTest.java \
PriorityBufferTest.java \
- ThreadPoolTest.java
+ ThreadPoolTest.java \
+ BufferStreamTest.java
packages = tests.ASX