diff options
author | eea1 <eea1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-08-24 23:12:58 +0000 |
---|---|---|
committer | eea1 <eea1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-08-24 23:12:58 +0000 |
commit | 45cc4c4cd115a04b408ac3a114e20dc4565885c7 (patch) | |
tree | 867093c38f07afc45b9c7a59af7c2b5e10fc9119 /java/JACE | |
parent | 6bef4fc6b22cfd13bb89be38e0877983e2c4a005 (diff) | |
download | ATCD-45cc4c4cd115a04b408ac3a114e20dc4565885c7.tar.gz |
Updated source files for tests/ASX.
Diffstat (limited to 'java/JACE')
-rw-r--r-- | java/JACE/tests/ASX/BufferStreamTest.java | 189 | ||||
-rw-r--r-- | java/JACE/tests/ASX/MessageQueueTest.java | 54 | ||||
-rw-r--r-- | java/JACE/tests/ASX/PriorityBufferTest.java | 118 | ||||
-rw-r--r-- | java/JACE/tests/ASX/TaskTest.java | 95 | ||||
-rw-r--r-- | java/JACE/tests/ASX/ThreadPoolTest.java | 187 |
5 files changed, 643 insertions, 0 deletions
diff --git a/java/JACE/tests/ASX/BufferStreamTest.java b/java/JACE/tests/ASX/BufferStreamTest.java new file mode 100644 index 00000000000..a9600da19dc --- /dev/null +++ b/java/JACE/tests/ASX/BufferStreamTest.java @@ -0,0 +1,189 @@ +// ============================================================================ +// +// = PACKAGE +// tests.ASX +// +// = FILENAME +// BufferStreamTest.java +// +// = AUTHOR +// Prashant Jain +// +// ============================================================================ +package JACE.tests.ASX; + +import java.io.*; +import JACE.OS.*; +import JACE.ASX.*; + +/** + * This short program copies stdin to stdout via the use of an ASX + * STREAM. It illustrates an implementation of the classic "bounded + * buffer" program using an ASX STREAM containing two Modules. Each + * Module contains two Tasks. + */ +public class BufferStreamTest +{ + + static class CommonTask extends Task + { + // ACE_Task hooks + public int open (Object obj) + { + if (this.activate (0, 1, false) == -1) + ACE.ERROR ("spawn"); + return 0; + } + + public int close (long exitStatus) + { + ACE.DEBUG (Thread.currentThread () + " thread is exiting with status " + + exitStatus + " in module " + this.name () + "\n"); + return 0; + } + + public int put (MessageBlock mb, TimeValue tv) + { + return 0; + } + + public int handleTimeout (TimeValue tv, Object obj) + { + return 0; + } + } + + // Define the Producer interface. + + static class Producer extends CommonTask + { + // Read data from stdin and pass to consumer. + // The Consumer reads data from the stdin stream, creates a message, + // and then queues the message in the message list, where it is + // removed by the consumer thread. A 0-sized message is enqueued when + // there is no more data to read. The consumer uses this as a flag to + // know when to exit. + + public int svc () + { + // Keep reading stdin, until we reach EOF. + + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + + String msg = null; + try + { + while (true) + { + System.out.print ("Enter input: "); + System.out.flush (); + msg = in.readLine (); + if (msg == null) + { + // Send a shutdown message to the other thread and exit. + if (this.putNext (new MessageBlock (0), null) == -1) + ACE.ERROR ("putNext"); + break; + } + else + { + // Send the message to the other thread. + if (this.putNext (new MessageBlock (msg), null) == -1) + ACE.ERROR ("putNext"); + } + } + } + catch (IOException e) + { + } + return 0; + } + } + + static class Consumer extends CommonTask + // = TITLE + // Define the Consumer interface. + { + // Enqueue the message on the MessageQueue for subsequent + // handling in the svc() method. + public int put (MessageBlock mb, TimeValue tv) + { + try + { + return this.putq (mb, tv); + } + catch (InterruptedException e) + { + } + return 0; + } + + // The consumer dequeues a message from the ACE_Message_Queue, writes + // the message to the stderr stream, and deletes the message. The + // Consumer sends a 0-sized message to inform the consumer to stop + // reading and exit. + + public int svc () + { + MessageBlock mb = null; + + // Keep looping, reading a message out of the queue, until we + // timeout or get a message with a length == 0, which signals us to + // quit. + try + { + while (true) + { + // Wait for upto 4 seconds + mb = this.getq (TimeValue.relativeTimeOfDay (4, 0)); + + if (mb == null) + break; + + int length = mb.length (); + + if (length > 0) + System.out.println ("\n" + mb.base ()); + + if (length == 0) + break; + } + } + catch (InterruptedException e) + { + } + if (mb == null) + { + ACE.ERROR ("timed out waiting for message"); + System.exit (1); + } + return 0; + } + } + + // Spawn off a new thread. + + public static void main (String args[]) + { + ACE.enableDebugging (); + + // Control hierachically-related active objects + Stream stream = new Stream (); + Module pm = new Module ("Consumer", new Consumer (), null, null); + Module cm = new Module ("Producer", new Producer (), null, null); + + // Create Producer and Consumer Modules and push them onto the + // STREAM. All processing is performed in the STREAM. + + if (stream.push (pm) == -1) + { + ACE.ERROR ("push"); + return; + } + else if (stream.push (cm) == -1) + { + ACE.ERROR ("push"); + return; + } + } +} diff --git a/java/JACE/tests/ASX/MessageQueueTest.java b/java/JACE/tests/ASX/MessageQueueTest.java new file mode 100644 index 00000000000..38f098c1bb7 --- /dev/null +++ b/java/JACE/tests/ASX/MessageQueueTest.java @@ -0,0 +1,54 @@ +// ============================================================================ +// +// = PACKAGE +// tests.ASX +// +// = FILENAME +// MessageQueueTest.java +// +// = AUTHOR +// Prashant Jain +// +// ============================================================================ +package JACE.tests.ASX; + +import java.io.*; +import JACE.OS.*; +import JACE.ASX.*; + +public class MessageQueueTest +{ + public static void main (String args[]) + { + ACE.enableDebugging (); + + try + { + MessageBlock conMb; + MessageQueue msgQueue = new MessageQueue (); + MessageBlock mb1 = new MessageBlock ("hello"); + MessageBlock mb2 = new MessageBlock ("world"); + mb1.msgPriority (5); + mb2.msgPriority (7); + + // Enqueue in priority order. + if (msgQueue.enqueue (mb1) == -1) + ACE.ERROR ("put_next"); + + if (msgQueue.enqueue (mb2) == -1) + ACE.ERROR ("put_next"); + + // Now try to dequeue + if ((conMb = msgQueue.dequeueHead ()) == null) + ACE.ERROR ("dequeueHead"); + else + ACE.DEBUG ("Consumer: removed item " + conMb.base () + + " of priority " + conMb.msgPriority ()); + } + catch (InterruptedException e) + { + ACE.ERROR (e); + } + } +} + diff --git a/java/JACE/tests/ASX/PriorityBufferTest.java b/java/JACE/tests/ASX/PriorityBufferTest.java new file mode 100644 index 00000000000..37f0370522d --- /dev/null +++ b/java/JACE/tests/ASX/PriorityBufferTest.java @@ -0,0 +1,118 @@ +// ============================================================================ +// +// = PACKAGE +// tests.ASX +// +// = FILENAME +// PriorityBufferTest.java +// +// = AUTHOR +// Prashant Jain +// +// ============================================================================ +package JACE.tests.ASX; + +import java.io.*; +import JACE.OS.*; +import JACE.ASX.*; + +class consumer extends Thread +{ + public void run () + { + MessageBlock mb = null; + long curPriority = 0; + int length = 0; + + try + { + // Keep looping, reading a message out of the queue, until we + // get a message with a length == 0, which signals us to quit. + for (;;) + { + if ((mb = PriorityBufferTest.msgQueue.dequeueHead ()) == null) + break; + + length = mb.length (); + curPriority = mb.msgPriority (); + + if (length > 0) + ACE.DEBUG ("Consumer: removed item \"" + mb.base () + "\" of priority: " + curPriority); + + if (length == 0) + break; + } + } + catch (InterruptedException e) + { + } + } +} + +class producer extends Thread +{ + producer (int delay) + { + this.delay_ = delay; + } + + public void run () + { + try + { + long count = 0; + for (char c = 'a'; c <= 'z'; c++) + { + count++; + // Allocate a new message + MessageBlock mb = new MessageBlock (new Character (c).toString ()); + // Set the priority + mb.msgPriority (count); + + // Enqueue in priority order. + if (PriorityBufferTest.msgQueue.enqueue (mb) == -1) + ACE.ERROR ("put_next"); + else + { + ACE.DEBUG ("Producer: inserted item \"" + mb.base () + "\" of priority: " + count); + if (this.delay_ > 0) + this.sleep (this.delay_); + } + } + + // Now send a 0-sized shutdown message to the other thread + if (PriorityBufferTest.msgQueue.enqueueTail (new MessageBlock (0)) == -1) + ACE.ERROR ("put_next"); + } + catch (InterruptedException e) + { + } + } + + private int delay_; +} + +public class PriorityBufferTest +{ + public static MessageQueue msgQueue = new MessageQueue (); + + public static void main (String args[]) + { + ACE.enableDebugging (); + + int delay = 0; + if (args.length == 1) + { + try + { + delay = Integer.parseInt (args[0]); + } + catch (NumberFormatException e) + { + ACE.ERROR ("Illegal argument."); + } + } + new producer (delay).start (); + new consumer ().start (); + } +} diff --git a/java/JACE/tests/ASX/TaskTest.java b/java/JACE/tests/ASX/TaskTest.java new file mode 100644 index 00000000000..09ebfe22a1f --- /dev/null +++ b/java/JACE/tests/ASX/TaskTest.java @@ -0,0 +1,95 @@ +// ============================================================================ +// +// = PACKAGE +// tests.ASX +// +// = FILENAME +// TaskTest.java +// +// = AUTHOR +// Prashant Jain +// +// ============================================================================ +package JACE.tests.ASX; + +import java.io.*; +import JACE.OS.*; +import JACE.ASX.*; +import JACE.Reactor.*; + +public class TaskTest extends Task +{ + int nThreads_; + int nIterations_; + + public TaskTest (int nThreads, int nIterations) + { + this.nIterations_ = nIterations; + this.nThreads_ = nThreads; + } + + public void beginTest () + { + if (this.activate (0, this.nThreads_, true) == -1) + ACE.ERROR ("activate failed"); + } + + public int open (Object obj) + { + return 0; + } + + public int close (long flags) + { + return 0; + } + + public int put (MessageBlock mb, TimeValue tv) + { + return 0; + } + + public int handleTimeout (TimeValue tv, Object obj) + { + return 0; + } + + public int svc () + { + for (int i = 1; i <= this.nIterations_; i++) + { + ACE.DEBUG (Thread.currentThread ().toString () + + " in iteration " + i); + // Allow other threads to run + Thread.yield (); + } + return 0; + } + + public static void main (String args[]) + { + int nThreads = 1; + int nIterations = 1; + + ACE.enableDebugging (); + + try + { + if (args.length == 2) + { + nThreads = Integer.parseInt (args[0]); + nIterations = Integer.parseInt (args[1]); + } + else if (args.length == 1) + { + nThreads = Integer.parseInt (args[0]); + } + } + catch (NumberFormatException e) + { + ACE.ERROR ("Illegal argument."); + } + TaskTest tt = new TaskTest (nThreads, nIterations); + tt.beginTest (); + } +} diff --git a/java/JACE/tests/ASX/ThreadPoolTest.java b/java/JACE/tests/ASX/ThreadPoolTest.java new file mode 100644 index 00000000000..c48b0caf3c2 --- /dev/null +++ b/java/JACE/tests/ASX/ThreadPoolTest.java @@ -0,0 +1,187 @@ +// ============================================================================ +// +// = PACKAGE +// tests.ASX +// +// = FILENAME +// ThreadPoolTest.java +// +// = AUTHOR +// Prashant Jain +// +// ============================================================================ +package JACE.tests.ASX; + +import java.io.*; +import JACE.OS.*; +import JACE.ASX.*; +import JACE.Reactor.*; + +public class ThreadPoolTest extends Task +{ + int nThreads_; + int nIterations_; + + public static int MAX_MB_SIZE = 1024; + + public ThreadPoolTest (int nThreads, int nIterations) + { + this.nIterations_ = nIterations; + this.nThreads_ = nThreads; + if (this.activate (0, nThreads, true) == -1) + ACE.ERROR ("activate failed"); + } + + public int handleTimeout (TimeValue tv, Object obj) + { + return 0; + } + + public int open (Object obj) + { + return 0; + } + + public int close (long flags) + { + return 0; + } + + public int put (MessageBlock mb, TimeValue tv) + { + try + { + return this.putq (mb, tv); + } + catch (InterruptedException e) + { + } + return 0; + } + + public int svc () + { + int count = 1; + + // Keep looping, reading a message out of the queue, until we get a + // message with a length == 0, which signals us to quit. + try + { + for (;; count++) + { + MessageBlock mb = this.getq (null); + if (mb == null) + { + ACE.ERROR (Thread.currentThread ().toString () + " in iteration " + count + ", got result -1, exiting"); + break; + } + int length = mb.length (); + + if (length > 0) + ACE.DEBUG (Thread.currentThread ().toString () + + " in iteration " + count + ", length = " + + length + ", text = \"" + mb.base () + "\""); + + if (length == 0) + { + ACE.DEBUG (Thread.currentThread ().toString () + + " in iteration " + count + + ", got NULL message, exiting"); + break; + } + Thread.yield (); + } + } + catch (InterruptedException e) + { + } + return 0; + } + + public static void produce (ThreadPoolTest threadPool, int nIterations) + { + int count = 0; + for (int n = 0;;) + { + // Allocate a new message. + MessageBlock mb = new MessageBlock (new Integer (count).toString ()); + + if (count == nIterations) + n = 1; // Indicate that we need to shut down. + else + count++; + + if (count == 0 || (count % 20 == 0)) + { + try + { + Thread.sleep (1); + } + catch (InterruptedException e) + { + } + } + if (n != 1) + { + ACE.DEBUG ("Producing..."); + // Pass the message to the Thread_Pool. + if (threadPool.put (mb, null) == -1) + ACE.ERROR ("put"); + } + else + { + // Send a shutdown message to the waiting threads and exit. + ACE.DEBUG ("start loop, dump of task"); + + for (int i = threadPool.thrCount (); i > 0; i--) + { + ACE.DEBUG (Thread.currentThread ().toString () + + "EOF, enqueueing NULL block for thread " + i); + + // Enqueue a NULL message to flag each consumer to + // shutdown. + if (threadPool.put (new MessageBlock (0), null) == -1) + ACE.ERROR ("put"); + } + + break; + } + } + } + + public static void main (String args[]) + { + int nThreads = 1; + int nIterations = 100; + + ACE.enableDebugging (); + + try + { + if (args.length == 2) + { + nThreads = Integer.parseInt (args[0]); + nIterations = Integer.parseInt (args[1]); + } + else if (args.length == 1) + { + nThreads = Integer.parseInt (args[0]); + } + } + catch (NumberFormatException e) + { + ACE.ERROR ("Illegal argument."); + } + ACE.DEBUG ("Threads = " + nThreads + " Iterations = " + nIterations); + + // Create the worker tasks. + ThreadPoolTest threadPool = new ThreadPoolTest (nThreads, + nIterations); + + // Create work for the worker tasks to process in their own threads. + produce (threadPool, nIterations); + ACE.DEBUG ("exiting..."); + } +} + + |