/*************************************************
*
* = PACKAGE
* JACE.ASX
*
* = FILENAME
* MessageQueue.java
*
*@author Prashant Jain
*
*************************************************/
package JACE.ASX;
import java.util.Date;
import JACE.OS.*;
import JACE.Reactor.*;
class NotFullCondition extends TimedWait
{
public NotFullCondition (MessageQueue mq)
{
super (mq);
this.mq_ = mq;
}
public boolean condition () {
// Delegate to the appropriate conditional
// check on the MessageQueue.
return !this.mq_.isFull ();
}
private MessageQueue mq_;
}
class NotEmptyCondition extends TimedWait
{
public NotEmptyCondition (MessageQueue mq)
{
super (mq);
this.mq_ = mq;
}
public boolean condition () {
// Delegate to the appropriate conditional
// check on the MessageQueue.
return !this.mq_.isEmpty ();
}
private MessageQueue mq_;
}
/**
*
* SYNOPSIS
*
* A thread-safe message queueing facility, modeled after the
* queueing facilities in System V StreamS.
*
*
* DESCRIPTION
*
* MessageQueue is the central queueing facility for messages
* in the ASX framework. All operations are thread-safe, as it is intended
* to be used for inter-thread communication (e.g., a producer and
* consumer thread joined by a MessageQueue). The queue
* consiste of MessageBlocks.
*
*
*@see MessageBlock
*@see TimeValue
*/
public class MessageQueue
{
/**
* Default constructor
*/
public MessageQueue ()
{
this (DEFAULT_HWM, DEFAULT_LWM);
}
/**
* Create a Message Queue with high and low water marks.
*@param hwm High water mark (max number of bytes allowed in the
* queue)
*@param lwm Low water mark (min number of bytes in the queue)
*/
public MessageQueue (int hwm, int lwm)
{
if (this.open (hwm, lwm) == -1)
ACE.ERROR ("open");
}
/**
* Initialize a Message Queue with high and low water marks.
*@param hwm High water mark (max number of bytes allowed in the
* queue)
*@param lwm Low water mark (min number of bytes in the queue)
*/
public synchronized int open (int hwm, int lwm)
{
this.highWaterMark_ = hwm;
this.lowWaterMark_ = lwm;
this.deactivated_ = false;
this.currentBytes_ = 0;
this.currentCount_ = 0;
this.tail_ = null;
this.head_ = null;
return 0;
}
// = For enqueue, enqueueHead, enqueueTail, and dequeueHead if
// timeout is specified, the caller will wait for amount of time in
// tv. Calls will return, however, when queue is closed,
// deactivated, or if the time specified in tv elapses.
/**
* Enqueue a into the in accordance
* with its (0 is lowest priority). Note that the
* call will block (unless the queue has been deactivated).
*
*@exception java.lang.InterruptedException Interrupted while accessing queue
*@param newItem item to enqueue onto the Message Queue
*@return -1 on failure, else the number of items still on the queue.
*/
public synchronized int enqueue (MessageBlock newItem) throws InterruptedException
{
return this.enqueue (newItem, null);
}
/**
* Enqueue a into the in accordance
* with its (0 is lowest priority). Note that the
* call will return if amount of time expires or if the
* queue has been deactivated.
*@param newItem item to enqueue onto the Message Queue
*@param tv amount of time (TimeValue) to wait before returning
* (unless operation completes before)
*@return -1 on failure, else the number of items still on the
* queue.
*@exception java.lang.InterruptedException Interrupted while accessing queue
*/
public synchronized int enqueue (MessageBlock newItem,
TimeValue tv) throws InterruptedException
{
int result = -1;
if (this.deactivated_)
return -1;
try
{
if (tv == null) // Need to do a blocking wait
notFullCondition_.timedWait ();
else // Need to do a timed wait
notFullCondition_.timedWait (tv);
}
catch (TimeoutException e)
{
return -1;
}
// Check again if queue is still active
if (this.deactivated_)
return -1;
else
result = this.enqueueInternal (newItem);
// Tell any blocked threads that the queue has a new item!
this.notEmptyCondition_.broadcast ();
return result;
}
/**
* Enqueue a at the end of the . Note
* that the call will block (unless the queue has been deactivated).
*@param newItem item to enqueue onto the Message Queue
*@return -1 on failure, else the number of items still on the queue.
*@exception java.lang.InterruptedException Interrupted while accessing queue
*/
public synchronized int enqueueTail (MessageBlock newItem) throws InterruptedException
{
return this.enqueueTail (newItem, null);
}
/**
* Enqueue a at the end of the . Note
* that the call will return if amount of time expires or
* if the queue has been deactivated.
*@param newItem item to enqueue onto the Message Queue
*@param tv amount of time (TimeValue) to wait before returning
* (unless operation completes before)
*@return -1 on failure, else the number of items still on the queue.
*@exception java.lang.InterruptedException Interrupted while accessing queue
*/
public synchronized int enqueueTail (MessageBlock newItem,
TimeValue tv) throws InterruptedException
{
int result = -1;
if (this.deactivated_)
return -1;
try
{
if (tv == null) // Need to do a blocking wait
notFullCondition_.timedWait ();
else // Need to do a timed wait
notFullCondition_.timedWait (tv);
}
catch (TimeoutException e)
{
return -1;
}
// Check again if queue is still active
if (this.deactivated_)
return -1;
else
result = this.enqueueTailInternal (newItem);
// Tell any blocked threads that the queue has a new item!
this.notEmptyCondition_.broadcast ();
return result;
}
/**
* Enqueue a at the head of the . Note
* that the call will block (unless the queue has been deactivated).
*@param newItem item to enqueue onto the Message Queue
*@return -1 on failure, else the number of items still on the queue.
*@exception java.lang.InterruptedException Interrupted while accessing queue
*/
public synchronized int enqueueHead (MessageBlock newItem) throws InterruptedException
{
return this.enqueueHead (newItem, null);
}
/**
* Enqueue a at the head of the . Note
* that the call will return if amount of time expires or
* if the queue has been deactivated.
*@param newItem item to enqueue onto the Message Queue
*@param tv amount of time (TimeValue) to wait before returning
* (unless operation completes before)
*@return -1 on failure, else the number of items still on the queue.
*@exception java.lang.InterruptedException Interrupted while accessing queue
*/
public synchronized int enqueueHead (MessageBlock newItem,
TimeValue tv) throws InterruptedException
{
int result = -1;
if (this.deactivated_)
return -1;
try
{
if (tv == null) // Need to do a blocking wait
notFullCondition_.timedWait ();
else // Need to do a timed wait
notFullCondition_.timedWait (tv);
}
catch (TimeoutException e)
{
return -1;
}
// Check again if queue is still active
if (this.deactivated_)
return -1;
else
result = this.enqueueHeadInternal (newItem);
// Tell any blocked threads that the queue has a new item!
this.notEmptyCondition_.broadcast ();
return result;
}
/**
* Dequeue and return the at the head of the
* . Note that the call will block (unless the queue
* has been deactivated).
*@return null on failure, else the at the head of queue.
*@exception java.lang.InterruptedException Interrupted while accessing queue
*/
public synchronized MessageBlock dequeueHead () throws InterruptedException
{
return this.dequeueHead (null);
}
/**
* Dequeue and return the at the head of the
* . Note that the call will return if
* amount of time expires or if the queue has been deactivated.
*@return null on failure, else the at the head of queue.
*@exception InterruptedException Interrupted while accessing queue
*/
public synchronized MessageBlock dequeueHead (TimeValue tv) throws InterruptedException
{
MessageBlock result = null;
if (this.deactivated_)
return null;
try
{
if (tv == null) // Need to do a blocking wait
notEmptyCondition_.timedWait ();
else // Need to do a timed wait
notEmptyCondition_.timedWait (tv);
}
catch (TimeoutException e)
{
return null;
}
// Check again if queue is still active
if (this.deactivated_)
return null;
else
result = this.dequeueHeadInternal ();
// Tell any blocked threads that the queue has room for an item!
this.notFullCondition_.broadcast ();
return result;
}
/**
* Check if queue is full.
*@return true if queue is full, else false.
*/
public synchronized boolean isFull ()
{
return this.isFullInternal ();
}
/**
* Check if queue is empty.
*@return true if queue is empty, else false.
*/
public synchronized boolean isEmpty ()
{
return this.isEmptyInternal ();
}
/**
* Get total number of bytes on the queue.
*@return total number number of bytes on the queue
*/
public int messageBytes ()
{
return this.currentBytes_;
}
/**
* Get total number of messages on the queue.
*@return total number number of messages on the queue
*/
public int messageCount ()
{
return this.currentCount_;
}
// = Flow control routines
/**
* Get high watermark.
*@return high watermark
*/
public int highWaterMark ()
{
return this.highWaterMark_;
}
/**
* Set high watermark.
*@param hwm high watermark
*/
public void highWaterMark (int hwm)
{
this.highWaterMark_ = hwm;
}
/**
* Get low watermark.
*@return low watermark
*/
public int lowWaterMark ()
{
return this.lowWaterMark_;
}
/**
* Set low watermark.
*@param lwm low watermark
*/
public void lowWaterMark (int lwm)
{
this.lowWaterMark_ = lwm;
}
// = Activation control methods.
/**
* Deactivate the queue and wakeup all threads waiting on the queue
* so they can continue. No messages are removed from the queue,
* however. Any other operations called until the queue is
* activated again will immediately return -1.
*@return WAS_INACTIVE if queue was inactive before the call and
* WAS_ACTIVE if queue was active before the call.
*/
public synchronized int deactivate ()
{
return this.deactivateInternal ();
}
/**
* Reactivate the queue so that threads can enqueue and dequeue
* messages again.
*@return WAS_INACTIVE if queue was inactive before the call and
* WAS_ACTIVE if queue was active before the call.
*/
public synchronized int activate ()
{
return this.activateInternal ();
}
protected boolean isEmptyInternal ()
{
// Not sure about this one!!!!
return this.currentBytes_ <= this.lowWaterMark_ && this.currentCount_ <= 0;
}
protected boolean isFullInternal ()
{
return this.currentBytes_ > this.highWaterMark_;
}
protected int deactivateInternal ()
{
int currentStatus =
this.deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
this.notFullCondition_.broadcast ();
this.notEmptyCondition_.broadcast ();
this.deactivated_ = true;
return currentStatus;
}
protected int activateInternal ()
{
int currentStatus =
this.deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
this.deactivated_ = false;
return currentStatus;
}
protected int enqueueTailInternal (MessageBlock newItem)
{
if (newItem == null)
return -1;
// List was empty, so build a new one.
if (this.tail_ == null)
{
this.head_ = newItem;
this.tail_ = newItem;
newItem.next (null);
newItem.prev (null);
}
// Link at the end.
else
{
newItem.next (null);
this.tail_.next (newItem);
newItem.prev (this.tail_);
this.tail_ = newItem;
}
if (newItem.msgType() != MessageType.MB_OBJECT)
{
// Make sure to count *all* the bytes in a composite message!!!
for (MessageBlock temp = newItem;
temp != null;
temp = temp.cont ())
this.currentBytes_ += temp.size ();
}
this.currentCount_++;
return this.currentCount_;
}
protected int enqueueHeadInternal (MessageBlock newItem)
{
if (newItem == null)
return -1;
newItem.prev (null);
newItem.next (this.head_);
if (this.head_ != null)
this.head_.prev (newItem);
else
this.tail_ = newItem;
this.head_ = newItem;
if (newItem.msgType() != MessageType.MB_OBJECT)
{
// Make sure to count *all* the bytes in a composite message!!!
for (MessageBlock temp = newItem;
temp != null;
temp = temp.cont ())
this.currentBytes_ += temp.size ();
}
this.currentCount_++;
return this.currentCount_;
}
protected int enqueueInternal (MessageBlock newItem)
{
if (newItem == null)
return -1;
if (this.head_ == null)
// Check for simple case of an empty queue, where all we need to
// do is insert into the head.
return this.enqueueHeadInternal (newItem);
else
{
MessageBlock temp;
// Figure out where the new item goes relative to its priority.
for (temp = this.head_;
temp != null;
temp = temp.next ())
{
if (temp.msgPriority () <= newItem.msgPriority ())
// Break out when we've located an item that has lower
// priority that .
break;
}
if (temp == null)
// Check for simple case of inserting at the end of the queue,
// where all we need to do is insert after the
// current tail.
return this.enqueueTailInternal (newItem);
else if (temp.prev () == null)
// Check for simple case of inserting at the beginning of the
// queue, where all we need to do is insert before
// the current head.
return this.enqueueHeadInternal (newItem);
else
{
// Insert the message right before the item of equal or lower
// priority.
newItem.next (temp);
newItem.prev (temp.prev ());
temp.prev ().next (newItem);
temp.prev (newItem);
}
}
if (newItem.msgType() != MessageType.MB_OBJECT)
{
// Make sure to count *all* the bytes in a composite message!!!
for (MessageBlock temp = newItem;
temp != null;
temp = temp.cont ())
this.currentBytes_ += temp.size ();
}
this.currentCount_++;
return this.currentCount_;
}
protected MessageBlock dequeueHeadInternal ()
{
MessageBlock firstItem = this.head_;
this.head_ = this.head_.next ();
if (this.head_ == null)
this.tail_ = null;
if (firstItem.msgType() != MessageType.MB_OBJECT)
{
// Make sure to subtract off all of the bytes associated with this
// message.
for (MessageBlock temp = firstItem;
temp != null;
temp = temp.cont ())
this.currentBytes_ -= temp.size ();
}
this.currentCount_--;
return firstItem;
}
/** Default high watermark (16 K). */
public final static int DEFAULT_HWM = 16 * 1024;
/** Default low watermark. */
public final static int DEFAULT_LWM = 0;
/** Message queue was active before activate() or deactivate(). */
public final static int WAS_ACTIVE = 1;
/** Message queue was inactive before activate() or deactivate(). */
public final static int WAS_INACTIVE = 2;
private int highWaterMark_;
// Greatest number of bytes before blocking.
private int lowWaterMark_;
// Lowest number of bytes before unblocking occurs.
private boolean deactivated_;
// Indicates that the queue is inactive.
private int currentBytes_;
// Current number of bytes in the queue.
private int currentCount_;
// Current number of messages in the queue.
private MessageBlock head_;
// Head of Message_Block list.
private MessageBlock tail_;
// Tail of Message_Block list.
// The Delegated Notification mechanisms.
private NotFullCondition notFullCondition_ = new NotFullCondition (this);
private NotEmptyCondition notEmptyCondition_ = new NotEmptyCondition (this);
}