summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorirfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1996-11-12 08:17:47 +0000
committerirfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1996-11-12 08:17:47 +0000
commit7be1052a518d4559aef31963d8016bcf8451379f (patch)
treea9056d1b2240501a132c4a235e959f1728e06181
parent7f179caaef13f7c23c155f11a78312e657458e8f (diff)
downloadATCD-7be1052a518d4559aef31963d8016bcf8451379f.tar.gz
new implementation of traditional condition variables in java
complete rewrite of RWMutex in terms of "traditional" conditional variables new files to test condition variables
-rw-r--r--java/tests/Concurrency/Condition/Consumer.java73
-rw-r--r--java/tests/Concurrency/Condition/JoinableThreadGroup.java24
-rw-r--r--java/tests/Concurrency/Condition/Makefile25
-rw-r--r--java/tests/Concurrency/Condition/Producer.java67
-rw-r--r--java/tests/Concurrency/Condition/QueueTest.java64
-rw-r--r--java/tests/Concurrency/Condition/SimpleMessageQueue.java86
6 files changed, 339 insertions, 0 deletions
diff --git a/java/tests/Concurrency/Condition/Consumer.java b/java/tests/Concurrency/Condition/Consumer.java
new file mode 100644
index 00000000000..029cee2158a
--- /dev/null
+++ b/java/tests/Concurrency/Condition/Consumer.java
@@ -0,0 +1,73 @@
+//File: Consumer.java
+//Seth Widoff 8/8/96
+//This class attempts at random intervals to dequeue random elements
+//from a queue. If the queue is empty the thread waits until an element
+//has been enqueued and another thread has invoked the notify() method.
+
+package tests.Concurrency.Condition;
+
+import ACE.Reactor.TimeValue;
+import java.util.Random;
+
+public class Consumer implements Runnable
+{
+ //Maximum pause between dequeues (in milliseconds)
+ private static final int MAX_PAUSE = 1000;
+
+ private SimpleMessageQueue queue_;
+ private boolean stop_requested_ = false;
+ private String name_;
+ private int iterations_;
+ private TimeValue timeout_;
+
+ public Consumer(String name,
+ SimpleMessageQueue queue,
+ int iterations,
+ TimeValue timeout)
+ {
+ name_ = "Consumer " + name;
+ queue_ = queue;
+ iterations_ = iterations;
+ timeout_ = timeout;
+ }
+
+ public void run()
+ {
+ //Set the random number generator seed to the current time in
+ //milliseconds.
+
+ Random random = new Random(System.currentTimeMillis());
+ Integer element;
+
+ for (int i = 0; i < iterations_; )
+ {
+ try
+ {
+ element = (Integer)queue_.dequeue(timeout_);
+ if (element != null)
+ {
+
+ System.out.print("Consumer::run() " + name_ + " dequeued " + element.toString());
+ System.out.println(" Queue size: " + queue_.size());
+
+ Thread.sleep(random.nextLong() % MAX_PAUSE);
+ }
+ else
+ {
+ System.out.println ("Null");
+ }
+ i++;
+ }
+ catch(Exception excp)
+ {
+ System.out.print ("Consumer::run() Exception: ");
+ System.out.println(excp);
+ }
+ }
+ }
+
+ public void requestStop()
+ {
+ stop_requested_ = true;
+ }
+}
diff --git a/java/tests/Concurrency/Condition/JoinableThreadGroup.java b/java/tests/Concurrency/Condition/JoinableThreadGroup.java
new file mode 100644
index 00000000000..c878eb026d3
--- /dev/null
+++ b/java/tests/Concurrency/Condition/JoinableThreadGroup.java
@@ -0,0 +1,24 @@
+package tests.Concurrency.Condition;
+
+public class JoinableThreadGroup extends ThreadGroup
+{
+ public JoinableThreadGroup(String name)
+ {
+ super(name);
+ }
+
+ public JoinableThreadGroup(ThreadGroup parent, String name)
+ {
+ super(parent, name);
+ }
+
+ public void join() throws InterruptedException
+ {
+ Thread list[] = new Thread[activeCount()];
+
+ enumerate(list, true);
+
+ for (int i = 0; i < list.length; i++)
+ list[i].join();
+ }
+}
diff --git a/java/tests/Concurrency/Condition/Makefile b/java/tests/Concurrency/Condition/Makefile
new file mode 100644
index 00000000000..fd1e6a93677
--- /dev/null
+++ b/java/tests/Concurrency/Condition/Makefile
@@ -0,0 +1,25 @@
+# Makefile
+
+.SUFFIXES: .java .class
+
+JACE_WRAPPER = ../../..
+CLASSDIR = $(JACE_WRAPPER)/classes
+
+CLASSPATH := $(CLASSDIR):$(CLASSPATH)
+
+all:
+ javac -d ${JACE_WRAPPER}/classes $(files)
+doc:
+ javadoc -d ${JACE_WRAPPER}/doc $(files) $(packages)
+
+files = \
+ QueueTest.java \
+ JoinableThreadGroup.java \
+ SimpleMessageQueue.java \
+ Producer.java \
+ Consumer.java
+
+packages = tests.Concurrency.Condition;
+
+realclean:
+ find ${JACE_WRAPPER}/classes/tests/Concurrency/Condition -name '*.class' -print | xargs ${RM}
diff --git a/java/tests/Concurrency/Condition/Producer.java b/java/tests/Concurrency/Condition/Producer.java
new file mode 100644
index 00000000000..d286e69fe91
--- /dev/null
+++ b/java/tests/Concurrency/Condition/Producer.java
@@ -0,0 +1,67 @@
+//File: Producer.java
+//Seth Widoff 8/8/96
+//This class attempts at random intervals to enqueue random elements
+//into a queue. If the queue is full the thread waits until an element
+//has been dequeued and another thread has invoked the notify() method.
+
+package tests.Concurrency.Condition;
+
+import ACE.Reactor.TimeValue;
+import java.util.Random;
+
+public class Producer implements Runnable
+{
+ //Maximum pause between enqueues (in milliseconds)
+ private static final int MAX_PAUSE = 1000;
+
+ private SimpleMessageQueue queue_;
+ private boolean stop_requested_ = false;
+ private String name_;
+ private int iterations_;
+ private TimeValue timeout_;
+
+ public Producer(String name,
+ SimpleMessageQueue queue,
+ int iterations,
+ TimeValue timeout)
+ {
+ name_ = "Producer " + name;
+ queue_ = queue;
+ iterations_ = iterations;
+ timeout_ = timeout;
+ }
+
+ public void run()
+ {
+ //Set the random number generator seed to the current time in milliseconds.
+ Random random = new Random(System.currentTimeMillis());
+ int element = 1;
+
+ for (int i = 0; i < iterations_; )
+ {
+ try
+ {
+ // element = random.nextInt();
+
+ queue_.enqueue((Object)new Integer(element), timeout_);
+ System.out.print("Producer::run() " + name_ + " enqueued " + element);
+ System.out.println(" Queue size: " + queue_.size());
+
+ Thread.sleep(random.nextLong() % MAX_PAUSE);
+ i++;
+ element++;
+ }
+ catch(Exception excp)
+ {
+ System.out.print("Producer::run() Exception: ");
+ System.out.println(excp);
+ }
+ }
+ }
+
+ public void requestStop()
+ {
+ stop_requested_ = true;
+ }
+}
+
diff --git a/java/tests/Concurrency/Condition/QueueTest.java b/java/tests/Concurrency/Condition/QueueTest.java
new file mode 100644
index 00000000000..e82a3eb26d2
--- /dev/null
+++ b/java/tests/Concurrency/Condition/QueueTest.java
@@ -0,0 +1,64 @@
+//File: QueueTest.java
+//Seth Widoff, 8/8/96
+//This class is a test method for the Producer and Consumer classes.
+//The main method takes as arguments the number of producers, the
+//number of consumers and the number of elements in the queue. It then
+//spawn the specified threads and starts them.
+
+package tests.Concurrency.Condition;
+
+import ACE.Reactor.TimeValue;
+
+public class QueueTest
+{
+ public static void main(String[] args)
+ {
+ if (args.length < 5)
+ {
+ System.out.println("Usage: java QueueTest <# producers> <# consumers> <# elements> <#iterations> <#timeout secs> <#timeout nano secs>");
+ System.exit(1);
+ }
+
+ int num_producers = Integer.parseInt(args[0]),
+ num_consumers = Integer.parseInt(args[1]),
+ num_elements = Integer.parseInt(args[2]),
+ num_iterations = Integer.parseInt(args[3]),
+ num_timeout_secs = Integer.parseInt(args[4]),
+ num_timeout_nano_secs = Integer.parseInt(args[5]);
+
+ if (num_elements < 1
+ || num_consumers < 1
+ || num_producers < 1)
+ {
+ System.out.println("All the parameters must be larger than zero.");
+ System.exit(1);
+ }
+
+ SimpleMessageQueue queue = new SimpleMessageQueue(num_elements);
+ Consumer[] consumers = new Consumer[num_consumers];
+ Producer[] producers = new Producer[num_producers];
+ JoinableThreadGroup thread_group = new JoinableThreadGroup("Producer Consumer");
+
+ for (int i = 0; i < num_producers; i++)
+ {
+ producers[i] = new Producer("Number " + (i + 1), queue, num_iterations, new TimeValue (num_timeout_secs, num_timeout_nano_secs));
+ new Thread(thread_group, producers[i]).start();
+ }
+
+ for (int i = 0; i < num_consumers; i++)
+ {
+ consumers[i] = new Consumer("Number " + (i + 1), queue, num_iterations, new TimeValue (num_timeout_secs, num_timeout_nano_secs));
+ new Thread(thread_group, consumers[i]).start();
+ }
+
+ try
+ {
+ thread_group.join();
+ }
+ catch(InterruptedException excp)
+ {
+ System.out.println("QueueTest::main");
+ System.out.println(excp);
+ }
+ }
+}
diff --git a/java/tests/Concurrency/Condition/SimpleMessageQueue.java b/java/tests/Concurrency/Condition/SimpleMessageQueue.java
new file mode 100644
index 00000000000..712789fb174
--- /dev/null
+++ b/java/tests/Concurrency/Condition/SimpleMessageQueue.java
@@ -0,0 +1,86 @@
+package tests.Concurrency.Condition;
+
+import ACE.ASX.TimeoutException;
+import ACE.Reactor.TimeValue;
+import ACE.Concurrency.*;
+
+public class SimpleMessageQueue
+{
+ private int num_items_ = 0;
+ private int head_ = 0, tail_ = 0;
+ private Object[] queue_;
+
+ private Mutex lock_ = new Mutex ();
+ private Condition notFull_ = new Condition (lock_);
+ private Condition notEmpty_ = new Condition (lock_);
+
+ public SimpleMessageQueue(int size)
+ {
+ queue_ = new Object[size];
+ }
+
+ public void enqueue(Object element, TimeValue timeout)
+ throws TimeoutException, InterruptedException
+ {
+ try
+ {
+ lock_.acquire ();
+ while (this.isFull ())
+ notFull_.Wait (timeout);
+
+ if (tail_ == queue_.length)
+ tail_ = 0;
+ queue_[tail_] = element;
+ tail_++;
+
+ num_items_++;
+ notEmpty_.signal ();
+ }
+ finally
+ {
+ lock_.release ();
+ }
+ }
+
+ public Object dequeue (TimeValue timeout)
+ throws TimeoutException, InterruptedException
+ {
+ Object return_value = null;
+
+ try
+ {
+ lock_.acquire ();
+ while (this.isEmpty ())
+ notEmpty_.Wait (timeout);
+
+ return_value = queue_[head_];
+ head_++;
+ if (head_ == queue_.length)
+ head_ = 0;
+
+ num_items_--;
+ notFull_.signal ();
+ }
+ finally
+ {
+ lock_.release ();
+ }
+ return return_value;
+ }
+
+ public boolean isEmpty()
+ {
+ return num_items_ == 0;
+ }
+
+ public boolean isFull()
+ {
+ return num_items_ == queue_.length;
+ }
+
+ public int size()
+ {
+ return num_items_;
+ }
+}
+