/*************************************************
*
* = PACKAGE
* JACE.Concurrency
*
* = FILENAME
* Token.java
*
*@author Prashant Jain
*
*************************************************/
package JACE.Concurrency;
import java.util.*;
import JACE.ASX.*;
class WaitObject extends TimedWait
{
public boolean condition ()
{
return this.condition_;
}
public void condition (boolean c)
{
this.condition_ = c;
}
private boolean condition_ = false;
}
/**
*
* SYNOPSIS
*
* Class that acquires, renews, and releases a synchronization
* token that is serviced in strict FIFO ordering.
*
*
*
* DESCRIPTION
*
* This is a general-purpose synchronization mechanism that offers
* several benefits. For example, it implements "recursive mutex"
* semantics, where a thread that owns the token can reacquire it
* without deadlocking. In addition, threads that are blocked
* awaiting the token are serviced in strict FIFO order as other
* threads release the token. The solution makes use of the
* Specific Notification pattern presented by Tom Cargill in
* "Specific Notification for Java Thread Synchronization," PLoP96.
*
*/
public class Token
{
/**
* Acquire the token. Note that this will block. The method uses
* synchronized blocks internally to avoid race conditions.
*@return 0 if acquires without calling
* 1 if is called.
* -1 if failure occurs
*@exception InterruptedException exception during wait
*/
public int acquire () throws InterruptedException
{
try
{
return this.acquire (new TimeValue ());
}
catch (TimeoutException e)
{
// This really shouldn't happen since we are supposed to
// block.
return -1;
}
}
/**
* Acquire the token. Wait for timeout amount of time. The method
* uses synchronized blocks internally to avoid race conditions.
*@param timeout Amount of time to wait for in trying to acquire the
* token.
*@return 0 if acquires without calling
* 1 if is called.
* -1 if failure occurs
*@exception TimeoutException exception if timeout occurs
*@exception InterruptedException exception during wait
*/
public int acquire (TimeValue timeout) throws InterruptedException, TimeoutException
{
int result = 0;
WaitObject snl = new WaitObject ();
boolean mustWait;
synchronized (snl)
{
synchronized (this)
{
mustWait = !this.snq_.isEmpty ();
if (mustWait &&
Thread.currentThread ().toString ().compareTo (this.owner_) == 0)
{
// I am the one who has the token. So just increment
// the nesting level
this.nestingLevel_++;
return result;
}
// Add local lock to the queue
this.snq_.addElement (snl);
}
if (mustWait)
{
result = 1;
// Call sleep hook
sleepHook ();
snl.timedWait (timeout); // Do a blocking wait
}
// Set the owner of the token
this.owner_ = Thread.currentThread ().toString ();
}
return result;
}
/**
* Try to acquire the token. Implements a non-blocking acquire.
*@return 0 if acquires without calling
* 1 if is called.
* -1 if failure occurs
*/
public synchronized int tryAcquire ()
{
int result = 0;
if (!this.snq_.isEmpty ())
{
// No one has the token, so acquire it
this.snq_.addElement (new WaitObject ());
}
// Check if I am the one holding the token.
else if (Thread.currentThread ().toString ().compareTo (this.owner_) == 0)
{
this.nestingLevel_++;
}
// Someone else has the token.
else
{
// Will have to block to acquire the token, so call
// sleepHook and return
sleepHook ();
result = 1;
}
return result;
}
/**
* Method that is called before a thread goes to sleep in an
* acquire(). This should be overridden by a subclass to define
* the appropriate behavior before acquire() goes to sleep.
* By default, this is a no-op.
*/
public void sleepHook ()
{
}
/**
* An optimized method that efficiently reacquires the token if no
* other threads are waiting. This is useful for situations where
* you don't want to degrade the quality of service if there are
* other threads waiting to get the token.
*@param requeuePosition Position in the queue where to insert the
* lock. If requeuePosition == -1 and there are other threads
* waiting to obtain the token we are queued at the end of the list
* of waiters. If requeuePosition > -1 then it indicates how many
* entries to skip over before inserting our thread into the list of
* waiters (e.g.,requeuePosition == 0 means "insert at front of the
* queue").
*@exception InterruptedException exception during wait
*/
public void renew (int requeuePosition) throws InterruptedException
{
try
{
this.renew (requeuePosition, new TimeValue ());
}
catch (TimeoutException e)
{
// This really shouldn't happen since we are supposed to
// block.
}
}
/**
* An optimized method that efficiently reacquires the token if no
* other threads are waiting. This is useful for situations where
* you don't want to degrade the quality of service if there are
* other threads waiting to get the token.
*@param requeuePosition Position in the queue where to insert the
* lock. If requeuePosition == -1 and there are other threads
* waiting to obtain the token we are queued at the end of the list
* of waiters. If requeuePosition > -1 then it indicates how many
* entries to skip over before inserting our thread into the list of
* waiters (e.g.,requeuePosition == 0 means "insert at front of the
* queue").
*@param timeout Amount of time to wait for in trying to acquire the
* token.
*@exception TimeoutException exception if timeout occurs
*@exception InterruptedException exception during wait
*/
public void renew (int requeuePosition, TimeValue timeout)
throws InterruptedException, TimeoutException
{
WaitObject snl = null;
int saveNestingLevel = 0;
synchronized (this)
{
// Check if there is a thread waiting to acquire the token. If
// not or if requeuePosition == 0, then we don't do anything
// and we simply keep the token.
if (this.snq_.size () > 1 && requeuePosition != 0)
{
// Save the nesting level
saveNestingLevel = this.nestingLevel_;
this.nestingLevel_ = 0;
// Reinsert ourselves at requeuePosition in the queue
snl = (WaitObject) this.snq_.firstElement ();
this.snq_.removeElementAt (0);
if (requeuePosition < 0)
this.snq_.addElement (snl); // Insert at end
else
this.snq_.insertElementAt (snl, requeuePosition);
synchronized (this.snq_.firstElement ())
{
// Notify the first waiting thread in the queue
WaitObject obj = (WaitObject) this.snq_.firstElement ();
// Set its condition to be true so that it falls out
// of the for loop
obj.condition (true);
// Now signal the thread
obj.signal ();
}
}
}
// Check if we reinserted the lock in the queue and therefore need
// to do a wait
if (snl != null)
{
synchronized (snl)
{
// Set the condition to be false so that we can begin the
// wait
snl.condition (false);
// Do a blocking wait
snl.timedWait (timeout);
}
// Restore the nesting level and current owner of the lock
this.nestingLevel_ = saveNestingLevel;
this.owner_ = Thread.currentThread ().toString ();
}
}
/**
* Release the token.
*/
public synchronized void release ()
{
// Check if nestingLevel > 0 and if so, decrement it
if (this.nestingLevel_ > 0)
this.nestingLevel_--;
else
{
this.snq_.removeElementAt (0);
if (!this.snq_.isEmpty ())
{
synchronized (this.snq_.firstElement ())
{
// Notify the first waiting thread in the queue
WaitObject obj = (WaitObject) this.snq_.firstElement ();
// Set its condition to be true so that it falls out
// of the for loop
obj.condition (true);
// Now signal the thread
obj.signal ();
}
}
}
}
private Vector snq_ = new Vector ();
// Vector of lock objects
private int nestingLevel_ = 0;
// Current Nesting Level
private String owner_ = null;
// Current owner of the token.
}