summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoreea1 <eea1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-24 23:12:58 +0000
committereea1 <eea1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-24 23:12:58 +0000
commit73c7c80869dbbde686d064f745ae8d65433c8f19 (patch)
tree867093c38f07afc45b9c7a59af7c2b5e10fc9119
parentb63497639790e8470a84e5f622f1ba6ce09d5c48 (diff)
downloadATCD-73c7c80869dbbde686d064f745ae8d65433c8f19.tar.gz
Updated source files for tests/ASX.
-rw-r--r--java/JACE/tests/ASX/BufferStreamTest.java189
-rw-r--r--java/JACE/tests/ASX/MessageQueueTest.java54
-rw-r--r--java/JACE/tests/ASX/PriorityBufferTest.java118
-rw-r--r--java/JACE/tests/ASX/TaskTest.java95
-rw-r--r--java/JACE/tests/ASX/ThreadPoolTest.java187
5 files changed, 643 insertions, 0 deletions
diff --git a/java/JACE/tests/ASX/BufferStreamTest.java b/java/JACE/tests/ASX/BufferStreamTest.java
new file mode 100644
index 00000000000..a9600da19dc
--- /dev/null
+++ b/java/JACE/tests/ASX/BufferStreamTest.java
@@ -0,0 +1,189 @@
+// ============================================================================
+//
+// = PACKAGE
+// tests.ASX
+//
+// = FILENAME
+// BufferStreamTest.java
+//
+// = AUTHOR
+// Prashant Jain
+//
+// ============================================================================
+package JACE.tests.ASX;
+
+import java.io.*;
+import JACE.OS.*;
+import JACE.ASX.*;
+
+/**
+ * This short program copies stdin to stdout via the use of an ASX
+ * STREAM. It illustrates an implementation of the classic "bounded
+ * buffer" program using an ASX STREAM containing two Modules. Each
+ * Module contains two Tasks.
+ */
+public class BufferStreamTest
+{
+
+ static class CommonTask extends Task
+ {
+ // ACE_Task hooks
+ public int open (Object obj)
+ {
+ if (this.activate (0, 1, false) == -1)
+ ACE.ERROR ("spawn");
+ return 0;
+ }
+
+ public int close (long exitStatus)
+ {
+ ACE.DEBUG (Thread.currentThread () + " thread is exiting with status " +
+ exitStatus + " in module " + this.name () + "\n");
+ return 0;
+ }
+
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ return 0;
+ }
+
+ public int handleTimeout (TimeValue tv, Object obj)
+ {
+ return 0;
+ }
+ }
+
+ // Define the Producer interface.
+
+ static class Producer extends CommonTask
+ {
+ // Read data from stdin and pass to consumer.
+ // The Consumer reads data from the stdin stream, creates a message,
+ // and then queues the message in the message list, where it is
+ // removed by the consumer thread. A 0-sized message is enqueued when
+ // there is no more data to read. The consumer uses this as a flag to
+ // know when to exit.
+
+ public int svc ()
+ {
+ // Keep reading stdin, until we reach EOF.
+
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+
+ String msg = null;
+ try
+ {
+ while (true)
+ {
+ System.out.print ("Enter input: ");
+ System.out.flush ();
+ msg = in.readLine ();
+ if (msg == null)
+ {
+ // Send a shutdown message to the other thread and exit.
+ if (this.putNext (new MessageBlock (0), null) == -1)
+ ACE.ERROR ("putNext");
+ break;
+ }
+ else
+ {
+ // Send the message to the other thread.
+ if (this.putNext (new MessageBlock (msg), null) == -1)
+ ACE.ERROR ("putNext");
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ }
+ return 0;
+ }
+ }
+
+ static class Consumer extends CommonTask
+ // = TITLE
+ // Define the Consumer interface.
+ {
+ // Enqueue the message on the MessageQueue for subsequent
+ // handling in the svc() method.
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ try
+ {
+ return this.putq (mb, tv);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ return 0;
+ }
+
+ // The consumer dequeues a message from the ACE_Message_Queue, writes
+ // the message to the stderr stream, and deletes the message. The
+ // Consumer sends a 0-sized message to inform the consumer to stop
+ // reading and exit.
+
+ public int svc ()
+ {
+ MessageBlock mb = null;
+
+ // Keep looping, reading a message out of the queue, until we
+ // timeout or get a message with a length == 0, which signals us to
+ // quit.
+ try
+ {
+ while (true)
+ {
+ // Wait for upto 4 seconds
+ mb = this.getq (TimeValue.relativeTimeOfDay (4, 0));
+
+ if (mb == null)
+ break;
+
+ int length = mb.length ();
+
+ if (length > 0)
+ System.out.println ("\n" + mb.base ());
+
+ if (length == 0)
+ break;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ if (mb == null)
+ {
+ ACE.ERROR ("timed out waiting for message");
+ System.exit (1);
+ }
+ return 0;
+ }
+ }
+
+ // Spawn off a new thread.
+
+ public static void main (String args[])
+ {
+ ACE.enableDebugging ();
+
+ // Control hierachically-related active objects
+ Stream stream = new Stream ();
+ Module pm = new Module ("Consumer", new Consumer (), null, null);
+ Module cm = new Module ("Producer", new Producer (), null, null);
+
+ // Create Producer and Consumer Modules and push them onto the
+ // STREAM. All processing is performed in the STREAM.
+
+ if (stream.push (pm) == -1)
+ {
+ ACE.ERROR ("push");
+ return;
+ }
+ else if (stream.push (cm) == -1)
+ {
+ ACE.ERROR ("push");
+ return;
+ }
+ }
+}
diff --git a/java/JACE/tests/ASX/MessageQueueTest.java b/java/JACE/tests/ASX/MessageQueueTest.java
new file mode 100644
index 00000000000..38f098c1bb7
--- /dev/null
+++ b/java/JACE/tests/ASX/MessageQueueTest.java
@@ -0,0 +1,54 @@
+// ============================================================================
+//
+// = PACKAGE
+// tests.ASX
+//
+// = FILENAME
+// MessageQueueTest.java
+//
+// = AUTHOR
+// Prashant Jain
+//
+// ============================================================================
+package JACE.tests.ASX;
+
+import java.io.*;
+import JACE.OS.*;
+import JACE.ASX.*;
+
+public class MessageQueueTest
+{
+ public static void main (String args[])
+ {
+ ACE.enableDebugging ();
+
+ try
+ {
+ MessageBlock conMb;
+ MessageQueue msgQueue = new MessageQueue ();
+ MessageBlock mb1 = new MessageBlock ("hello");
+ MessageBlock mb2 = new MessageBlock ("world");
+ mb1.msgPriority (5);
+ mb2.msgPriority (7);
+
+ // Enqueue in priority order.
+ if (msgQueue.enqueue (mb1) == -1)
+ ACE.ERROR ("put_next");
+
+ if (msgQueue.enqueue (mb2) == -1)
+ ACE.ERROR ("put_next");
+
+ // Now try to dequeue
+ if ((conMb = msgQueue.dequeueHead ()) == null)
+ ACE.ERROR ("dequeueHead");
+ else
+ ACE.DEBUG ("Consumer: removed item " + conMb.base ()
+ + " of priority " + conMb.msgPriority ());
+ }
+ catch (InterruptedException e)
+ {
+ ACE.ERROR (e);
+ }
+ }
+}
+
diff --git a/java/JACE/tests/ASX/PriorityBufferTest.java b/java/JACE/tests/ASX/PriorityBufferTest.java
new file mode 100644
index 00000000000..37f0370522d
--- /dev/null
+++ b/java/JACE/tests/ASX/PriorityBufferTest.java
@@ -0,0 +1,118 @@
+// ============================================================================
+//
+// = PACKAGE
+// tests.ASX
+//
+// = FILENAME
+// PriorityBufferTest.java
+//
+// = AUTHOR
+// Prashant Jain
+//
+// ============================================================================
+package JACE.tests.ASX;
+
+import java.io.*;
+import JACE.OS.*;
+import JACE.ASX.*;
+
+class consumer extends Thread
+{
+ public void run ()
+ {
+ MessageBlock mb = null;
+ long curPriority = 0;
+ int length = 0;
+
+ try
+ {
+ // Keep looping, reading a message out of the queue, until we
+ // get a message with a length == 0, which signals us to quit.
+ for (;;)
+ {
+ if ((mb = PriorityBufferTest.msgQueue.dequeueHead ()) == null)
+ break;
+
+ length = mb.length ();
+ curPriority = mb.msgPriority ();
+
+ if (length > 0)
+ ACE.DEBUG ("Consumer: removed item \"" + mb.base () + "\" of priority: " + curPriority);
+
+ if (length == 0)
+ break;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+}
+
+class producer extends Thread
+{
+ producer (int delay)
+ {
+ this.delay_ = delay;
+ }
+
+ public void run ()
+ {
+ try
+ {
+ long count = 0;
+ for (char c = 'a'; c <= 'z'; c++)
+ {
+ count++;
+ // Allocate a new message
+ MessageBlock mb = new MessageBlock (new Character (c).toString ());
+ // Set the priority
+ mb.msgPriority (count);
+
+ // Enqueue in priority order.
+ if (PriorityBufferTest.msgQueue.enqueue (mb) == -1)
+ ACE.ERROR ("put_next");
+ else
+ {
+ ACE.DEBUG ("Producer: inserted item \"" + mb.base () + "\" of priority: " + count);
+ if (this.delay_ > 0)
+ this.sleep (this.delay_);
+ }
+ }
+
+ // Now send a 0-sized shutdown message to the other thread
+ if (PriorityBufferTest.msgQueue.enqueueTail (new MessageBlock (0)) == -1)
+ ACE.ERROR ("put_next");
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ private int delay_;
+}
+
+public class PriorityBufferTest
+{
+ public static MessageQueue msgQueue = new MessageQueue ();
+
+ public static void main (String args[])
+ {
+ ACE.enableDebugging ();
+
+ int delay = 0;
+ if (args.length == 1)
+ {
+ try
+ {
+ delay = Integer.parseInt (args[0]);
+ }
+ catch (NumberFormatException e)
+ {
+ ACE.ERROR ("Illegal argument.");
+ }
+ }
+ new producer (delay).start ();
+ new consumer ().start ();
+ }
+}
diff --git a/java/JACE/tests/ASX/TaskTest.java b/java/JACE/tests/ASX/TaskTest.java
new file mode 100644
index 00000000000..09ebfe22a1f
--- /dev/null
+++ b/java/JACE/tests/ASX/TaskTest.java
@@ -0,0 +1,95 @@
+// ============================================================================
+//
+// = PACKAGE
+// tests.ASX
+//
+// = FILENAME
+// TaskTest.java
+//
+// = AUTHOR
+// Prashant Jain
+//
+// ============================================================================
+package JACE.tests.ASX;
+
+import java.io.*;
+import JACE.OS.*;
+import JACE.ASX.*;
+import JACE.Reactor.*;
+
+public class TaskTest extends Task
+{
+ int nThreads_;
+ int nIterations_;
+
+ public TaskTest (int nThreads, int nIterations)
+ {
+ this.nIterations_ = nIterations;
+ this.nThreads_ = nThreads;
+ }
+
+ public void beginTest ()
+ {
+ if (this.activate (0, this.nThreads_, true) == -1)
+ ACE.ERROR ("activate failed");
+ }
+
+ public int open (Object obj)
+ {
+ return 0;
+ }
+
+ public int close (long flags)
+ {
+ return 0;
+ }
+
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ return 0;
+ }
+
+ public int handleTimeout (TimeValue tv, Object obj)
+ {
+ return 0;
+ }
+
+ public int svc ()
+ {
+ for (int i = 1; i <= this.nIterations_; i++)
+ {
+ ACE.DEBUG (Thread.currentThread ().toString () +
+ " in iteration " + i);
+ // Allow other threads to run
+ Thread.yield ();
+ }
+ return 0;
+ }
+
+ public static void main (String args[])
+ {
+ int nThreads = 1;
+ int nIterations = 1;
+
+ ACE.enableDebugging ();
+
+ try
+ {
+ if (args.length == 2)
+ {
+ nThreads = Integer.parseInt (args[0]);
+ nIterations = Integer.parseInt (args[1]);
+ }
+ else if (args.length == 1)
+ {
+ nThreads = Integer.parseInt (args[0]);
+ }
+ }
+ catch (NumberFormatException e)
+ {
+ ACE.ERROR ("Illegal argument.");
+ }
+ TaskTest tt = new TaskTest (nThreads, nIterations);
+ tt.beginTest ();
+ }
+}
diff --git a/java/JACE/tests/ASX/ThreadPoolTest.java b/java/JACE/tests/ASX/ThreadPoolTest.java
new file mode 100644
index 00000000000..c48b0caf3c2
--- /dev/null
+++ b/java/JACE/tests/ASX/ThreadPoolTest.java
@@ -0,0 +1,187 @@
+// ============================================================================
+//
+// = PACKAGE
+// tests.ASX
+//
+// = FILENAME
+// ThreadPoolTest.java
+//
+// = AUTHOR
+// Prashant Jain
+//
+// ============================================================================
+package JACE.tests.ASX;
+
+import java.io.*;
+import JACE.OS.*;
+import JACE.ASX.*;
+import JACE.Reactor.*;
+
+public class ThreadPoolTest extends Task
+{
+ int nThreads_;
+ int nIterations_;
+
+ public static int MAX_MB_SIZE = 1024;
+
+ public ThreadPoolTest (int nThreads, int nIterations)
+ {
+ this.nIterations_ = nIterations;
+ this.nThreads_ = nThreads;
+ if (this.activate (0, nThreads, true) == -1)
+ ACE.ERROR ("activate failed");
+ }
+
+ public int handleTimeout (TimeValue tv, Object obj)
+ {
+ return 0;
+ }
+
+ public int open (Object obj)
+ {
+ return 0;
+ }
+
+ public int close (long flags)
+ {
+ return 0;
+ }
+
+ public int put (MessageBlock mb, TimeValue tv)
+ {
+ try
+ {
+ return this.putq (mb, tv);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ return 0;
+ }
+
+ public int svc ()
+ {
+ int count = 1;
+
+ // Keep looping, reading a message out of the queue, until we get a
+ // message with a length == 0, which signals us to quit.
+ try
+ {
+ for (;; count++)
+ {
+ MessageBlock mb = this.getq (null);
+ if (mb == null)
+ {
+ ACE.ERROR (Thread.currentThread ().toString () + " in iteration " + count + ", got result -1, exiting");
+ break;
+ }
+ int length = mb.length ();
+
+ if (length > 0)
+ ACE.DEBUG (Thread.currentThread ().toString () +
+ " in iteration " + count + ", length = " +
+ length + ", text = \"" + mb.base () + "\"");
+
+ if (length == 0)
+ {
+ ACE.DEBUG (Thread.currentThread ().toString () +
+ " in iteration " + count +
+ ", got NULL message, exiting");
+ break;
+ }
+ Thread.yield ();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ return 0;
+ }
+
+ public static void produce (ThreadPoolTest threadPool, int nIterations)
+ {
+ int count = 0;
+ for (int n = 0;;)
+ {
+ // Allocate a new message.
+ MessageBlock mb = new MessageBlock (new Integer (count).toString ());
+
+ if (count == nIterations)
+ n = 1; // Indicate that we need to shut down.
+ else
+ count++;
+
+ if (count == 0 || (count % 20 == 0))
+ {
+ try
+ {
+ Thread.sleep (1);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ if (n != 1)
+ {
+ ACE.DEBUG ("Producing...");
+ // Pass the message to the Thread_Pool.
+ if (threadPool.put (mb, null) == -1)
+ ACE.ERROR ("put");
+ }
+ else
+ {
+ // Send a shutdown message to the waiting threads and exit.
+ ACE.DEBUG ("start loop, dump of task");
+
+ for (int i = threadPool.thrCount (); i > 0; i--)
+ {
+ ACE.DEBUG (Thread.currentThread ().toString () +
+ "EOF, enqueueing NULL block for thread " + i);
+
+ // Enqueue a NULL message to flag each consumer to
+ // shutdown.
+ if (threadPool.put (new MessageBlock (0), null) == -1)
+ ACE.ERROR ("put");
+ }
+
+ break;
+ }
+ }
+ }
+
+ public static void main (String args[])
+ {
+ int nThreads = 1;
+ int nIterations = 100;
+
+ ACE.enableDebugging ();
+
+ try
+ {
+ if (args.length == 2)
+ {
+ nThreads = Integer.parseInt (args[0]);
+ nIterations = Integer.parseInt (args[1]);
+ }
+ else if (args.length == 1)
+ {
+ nThreads = Integer.parseInt (args[0]);
+ }
+ }
+ catch (NumberFormatException e)
+ {
+ ACE.ERROR ("Illegal argument.");
+ }
+ ACE.DEBUG ("Threads = " + nThreads + " Iterations = " + nIterations);
+
+ // Create the worker tasks.
+ ThreadPoolTest threadPool = new ThreadPoolTest (nThreads,
+ nIterations);
+
+ // Create work for the worker tasks to process in their own threads.
+ produce (threadPool, nIterations);
+ ACE.DEBUG ("exiting...");
+ }
+}
+
+