diff options
author | irfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-11-12 08:17:47 +0000 |
---|---|---|
committer | irfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-11-12 08:17:47 +0000 |
commit | 7be1052a518d4559aef31963d8016bcf8451379f (patch) | |
tree | a9056d1b2240501a132c4a235e959f1728e06181 | |
parent | 7f179caaef13f7c23c155f11a78312e657458e8f (diff) | |
download | ATCD-7be1052a518d4559aef31963d8016bcf8451379f.tar.gz |
new implementation of traditional condition variables in java
complete rewrite of RWMutex in terms of "traditional" conditional
variables
new files to test condition variables
-rw-r--r-- | java/tests/Concurrency/Condition/Consumer.java | 73 | ||||
-rw-r--r-- | java/tests/Concurrency/Condition/JoinableThreadGroup.java | 24 | ||||
-rw-r--r-- | java/tests/Concurrency/Condition/Makefile | 25 | ||||
-rw-r--r-- | java/tests/Concurrency/Condition/Producer.java | 67 | ||||
-rw-r--r-- | java/tests/Concurrency/Condition/QueueTest.java | 64 | ||||
-rw-r--r-- | java/tests/Concurrency/Condition/SimpleMessageQueue.java | 86 |
6 files changed, 339 insertions, 0 deletions
diff --git a/java/tests/Concurrency/Condition/Consumer.java b/java/tests/Concurrency/Condition/Consumer.java new file mode 100644 index 00000000000..029cee2158a --- /dev/null +++ b/java/tests/Concurrency/Condition/Consumer.java @@ -0,0 +1,73 @@ +//File: Consumer.java +//Seth Widoff 8/8/96 +//This class attempts at random intervals to dequeue random elements +//from a queue. If the queue is empty the thread waits until an element +//has been enqueued and another thread has invoked the notify() method. + +package tests.Concurrency.Condition; + +import ACE.Reactor.TimeValue; +import java.util.Random; + +public class Consumer implements Runnable +{ + //Maximum pause between dequeues (in milliseconds) + private static final int MAX_PAUSE = 1000; + + private SimpleMessageQueue queue_; + private boolean stop_requested_ = false; + private String name_; + private int iterations_; + private TimeValue timeout_; + + public Consumer(String name, + SimpleMessageQueue queue, + int iterations, + TimeValue timeout) + { + name_ = "Consumer " + name; + queue_ = queue; + iterations_ = iterations; + timeout_ = timeout; + } + + public void run() + { + //Set the random number generator seed to the current time in + //milliseconds. + + Random random = new Random(System.currentTimeMillis()); + Integer element; + + for (int i = 0; i < iterations_; ) + { + try + { + element = (Integer)queue_.dequeue(timeout_); + if (element != null) + { + + System.out.print("Consumer::run() " + name_ + " dequeued " + element.toString()); + System.out.println(" Queue size: " + queue_.size()); + + Thread.sleep(random.nextLong() % MAX_PAUSE); + } + else + { + System.out.println ("Null"); + } + i++; + } + catch(Exception excp) + { + System.out.print ("Consumer::run() Exception: "); + System.out.println(excp); + } + } + } + + public void requestStop() + { + stop_requested_ = true; + } +} diff --git a/java/tests/Concurrency/Condition/JoinableThreadGroup.java b/java/tests/Concurrency/Condition/JoinableThreadGroup.java new file mode 100644 index 00000000000..c878eb026d3 --- /dev/null +++ b/java/tests/Concurrency/Condition/JoinableThreadGroup.java @@ -0,0 +1,24 @@ +package tests.Concurrency.Condition; + +public class JoinableThreadGroup extends ThreadGroup +{ + public JoinableThreadGroup(String name) + { + super(name); + } + + public JoinableThreadGroup(ThreadGroup parent, String name) + { + super(parent, name); + } + + public void join() throws InterruptedException + { + Thread list[] = new Thread[activeCount()]; + + enumerate(list, true); + + for (int i = 0; i < list.length; i++) + list[i].join(); + } +} diff --git a/java/tests/Concurrency/Condition/Makefile b/java/tests/Concurrency/Condition/Makefile new file mode 100644 index 00000000000..fd1e6a93677 --- /dev/null +++ b/java/tests/Concurrency/Condition/Makefile @@ -0,0 +1,25 @@ +# Makefile + +.SUFFIXES: .java .class + +JACE_WRAPPER = ../../.. +CLASSDIR = $(JACE_WRAPPER)/classes + +CLASSPATH := $(CLASSDIR):$(CLASSPATH) + +all: + javac -d ${JACE_WRAPPER}/classes $(files) +doc: + javadoc -d ${JACE_WRAPPER}/doc $(files) $(packages) + +files = \ + QueueTest.java \ + JoinableThreadGroup.java \ + SimpleMessageQueue.java \ + Producer.java \ + Consumer.java + +packages = tests.Concurrency.Condition; + +realclean: + find ${JACE_WRAPPER}/classes/tests/Concurrency/Condition -name '*.class' -print | xargs ${RM} diff --git a/java/tests/Concurrency/Condition/Producer.java b/java/tests/Concurrency/Condition/Producer.java new file mode 100644 index 00000000000..d286e69fe91 --- /dev/null +++ b/java/tests/Concurrency/Condition/Producer.java @@ -0,0 +1,67 @@ +//File: Producer.java +//Seth Widoff 8/8/96 +//This class attempts at random intervals to enqueue random elements +//into a queue. If the queue is full the thread waits until an element +//has been dequeued and another thread has invoked the notify() method. + +package tests.Concurrency.Condition; + +import ACE.Reactor.TimeValue; +import java.util.Random; + +public class Producer implements Runnable +{ + //Maximum pause between enqueues (in milliseconds) + private static final int MAX_PAUSE = 1000; + + private SimpleMessageQueue queue_; + private boolean stop_requested_ = false; + private String name_; + private int iterations_; + private TimeValue timeout_; + + public Producer(String name, + SimpleMessageQueue queue, + int iterations, + TimeValue timeout) + { + name_ = "Producer " + name; + queue_ = queue; + iterations_ = iterations; + timeout_ = timeout; + } + + public void run() + { + //Set the random number generator seed to the current time in milliseconds. + Random random = new Random(System.currentTimeMillis()); + int element = 1; + + for (int i = 0; i < iterations_; ) + { + try + { + // element = random.nextInt(); + + queue_.enqueue((Object)new Integer(element), timeout_); + System.out.print("Producer::run() " + name_ + " enqueued " + element); + System.out.println(" Queue size: " + queue_.size()); + + Thread.sleep(random.nextLong() % MAX_PAUSE); + i++; + element++; + } + catch(Exception excp) + { + System.out.print("Producer::run() Exception: "); + System.out.println(excp); + } + } + } + + public void requestStop() + { + stop_requested_ = true; + } +} + diff --git a/java/tests/Concurrency/Condition/QueueTest.java b/java/tests/Concurrency/Condition/QueueTest.java new file mode 100644 index 00000000000..e82a3eb26d2 --- /dev/null +++ b/java/tests/Concurrency/Condition/QueueTest.java @@ -0,0 +1,64 @@ +//File: QueueTest.java +//Seth Widoff, 8/8/96 +//This class is a test method for the Producer and Consumer classes. +//The main method takes as arguments the number of producers, the +//number of consumers and the number of elements in the queue. It then +//spawn the specified threads and starts them. + +package tests.Concurrency.Condition; + +import ACE.Reactor.TimeValue; + +public class QueueTest +{ + public static void main(String[] args) + { + if (args.length < 5) + { + System.out.println("Usage: java QueueTest <# producers> <# consumers> <# elements> <#iterations> <#timeout secs> <#timeout nano secs>"); + System.exit(1); + } + + int num_producers = Integer.parseInt(args[0]), + num_consumers = Integer.parseInt(args[1]), + num_elements = Integer.parseInt(args[2]), + num_iterations = Integer.parseInt(args[3]), + num_timeout_secs = Integer.parseInt(args[4]), + num_timeout_nano_secs = Integer.parseInt(args[5]); + + if (num_elements < 1 + || num_consumers < 1 + || num_producers < 1) + { + System.out.println("All the parameters must be larger than zero."); + System.exit(1); + } + + SimpleMessageQueue queue = new SimpleMessageQueue(num_elements); + Consumer[] consumers = new Consumer[num_consumers]; + Producer[] producers = new Producer[num_producers]; + JoinableThreadGroup thread_group = new JoinableThreadGroup("Producer Consumer"); + + for (int i = 0; i < num_producers; i++) + { + producers[i] = new Producer("Number " + (i + 1), queue, num_iterations, new TimeValue (num_timeout_secs, num_timeout_nano_secs)); + new Thread(thread_group, producers[i]).start(); + } + + for (int i = 0; i < num_consumers; i++) + { + consumers[i] = new Consumer("Number " + (i + 1), queue, num_iterations, new TimeValue (num_timeout_secs, num_timeout_nano_secs)); + new Thread(thread_group, consumers[i]).start(); + } + + try + { + thread_group.join(); + } + catch(InterruptedException excp) + { + System.out.println("QueueTest::main"); + System.out.println(excp); + } + } +} diff --git a/java/tests/Concurrency/Condition/SimpleMessageQueue.java b/java/tests/Concurrency/Condition/SimpleMessageQueue.java new file mode 100644 index 00000000000..712789fb174 --- /dev/null +++ b/java/tests/Concurrency/Condition/SimpleMessageQueue.java @@ -0,0 +1,86 @@ +package tests.Concurrency.Condition; + +import ACE.ASX.TimeoutException; +import ACE.Reactor.TimeValue; +import ACE.Concurrency.*; + +public class SimpleMessageQueue +{ + private int num_items_ = 0; + private int head_ = 0, tail_ = 0; + private Object[] queue_; + + private Mutex lock_ = new Mutex (); + private Condition notFull_ = new Condition (lock_); + private Condition notEmpty_ = new Condition (lock_); + + public SimpleMessageQueue(int size) + { + queue_ = new Object[size]; + } + + public void enqueue(Object element, TimeValue timeout) + throws TimeoutException, InterruptedException + { + try + { + lock_.acquire (); + while (this.isFull ()) + notFull_.Wait (timeout); + + if (tail_ == queue_.length) + tail_ = 0; + queue_[tail_] = element; + tail_++; + + num_items_++; + notEmpty_.signal (); + } + finally + { + lock_.release (); + } + } + + public Object dequeue (TimeValue timeout) + throws TimeoutException, InterruptedException + { + Object return_value = null; + + try + { + lock_.acquire (); + while (this.isEmpty ()) + notEmpty_.Wait (timeout); + + return_value = queue_[head_]; + head_++; + if (head_ == queue_.length) + head_ = 0; + + num_items_--; + notFull_.signal (); + } + finally + { + lock_.release (); + } + return return_value; + } + + public boolean isEmpty() + { + return num_items_ == 0; + } + + public boolean isFull() + { + return num_items_ == queue_.length; + } + + public int size() + { + return num_items_; + } +} + |