diff options
author | pjain <pjain@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-11-12 00:51:48 +0000 |
---|---|---|
committer | pjain <pjain@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-11-12 00:51:48 +0000 |
commit | e8d6c3f71e9fc8d3d58e7d0e7cacbdd5c9737753 (patch) | |
tree | 92ca889f4133af7bcbc7a2f2c7c745b74f6178fb | |
parent | 42514f4dad71286732135a3436220b3672bbcf8c (diff) | |
download | ATCD-e8d6c3f71e9fc8d3d58e7d0e7cacbdd5c9737753.tar.gz |
image files
source files
31 files changed, 4474 insertions, 0 deletions
diff --git a/java/src/ACE.java b/java/src/ACE.java new file mode 100644 index 00000000000..4e558fe4609 --- /dev/null +++ b/java/src/ACE.java @@ -0,0 +1,164 @@ +/************************************************* + * + * = PACKAGE + * ACE.OS + * + * = FILENAME + * ACE.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.OS; + +/** + * <hr> + * <h2>SYNOPSIS</h2> + * <blockquote>Constants, utility "functions", etc.</blockquote> + * + * <h2>DESCRIPTION</h2> + *<blockquote> + * Defines default constants for ACE. Many of these are used for the + * ACE tests and applications. You may want to change some of these to + * correspond to your environment. Also, routines for error handling, + * debugging and bit manipulation are included. + *</blockquote> + * + * <h2>NOTES</h2> + *<blockquote> + * This class is non-instantiable, and intended only to provide a constrained + * namespace. + *</blockquote> + */ +public abstract class ACE +{ + /** + * Default port on which a server listens for connections. + */ + public static final int DEFAULT_SERVER_PORT = 10002; + + /** + * Default name to use for a thread group. + */ + public static final String DEFAULT_THREAD_GROUP_NAME = "ace_thread_group"; + + /** + * Disable debugging. Once debugging is disabled, all ACE.DEBUG + * statements would be ignored. + */ + public static final void disableDebugging () + { + ACE.debug_ = false; + } + + /** + * Enable debugging. Once debugging is enabled, all ACE.DEBUG + * statements get printed. + */ + public static final void enableDebugging () + { + ACE.debug_ = true; + } + + /** + * Print the string representation of Java Exception. + *@param e Java exception + */ + public static final void ERROR (Exception e) + { + System.err.println (e); + } + + /** + * Print the string being passed in. + *@param s a Java String + */ + public static final void ERROR (String s) + { + System.err.println (s); + } + + /** + * Print the string being passed in. + *@param s A Java String + *@return Error value passed in + */ + public static final int ERROR_RETURN (String s, int errorVal) + { + System.err.println (s); + return errorVal; + } + + /** + * Print the string being passed in. Note the behavior will vary + * depending upon whether debugging is enabled or disabled. + *@param s a Java String + */ + public static final void DEBUG (String s) + { + if (ACE.debug_) + System.out.println (s); + } + + /** + * Flush out any data that may be buffered. + */ + public static final void FLUSH () + { + System.out.flush (); + } + + /** + * Set the bits of WORD using BITS as the mask. + *@param WORD the bits to be set. + *@param BITS the mask to use. + *@return The value obtained after setting the bits. + */ + public static final long SET_BITS (long WORD, long BITS) + { + return WORD | BITS; + } + + /** + * Clear the bits of WORD using BITS as the mask. + *@param WORD the bits to clear. + *@param BITS the mask to use. + *@return The value obtained after clearing the bits. + */ + public static final long CLR_BITS (long WORD, long BITS) + { + return WORD & ~BITS; + } + + /** + * Check if bits are enabled in WORD. + *@param WORD the bits to check. + *@param BIT the bit to check to see if it is enabled or not. + *@return true if bit is enabled, false otherwise. + */ + public static final boolean BIT_ENABLED (long WORD, long BIT) + { + return (WORD & BIT) != 0; + } + + /** + * Check if bits are disabled in WORD. + *@param WORD the bits to check. + *@param BIT the bit to check to see if it is disabled or not. + *@return true if bit is disabled, false otherwise. + */ + public static final boolean BIT_DISABLED (long WORD, long BIT) + { + return (WORD & BIT) == 0; + } + + // Debug flag (turn debugging on/off) + private static boolean debug_ = true; + + // Default private constructor to avoid instantiation + private ACE () + { + } +} + + diff --git a/java/src/AcceptStrategy.java b/java/src/AcceptStrategy.java new file mode 100644 index 00000000000..cf35c4fb460 --- /dev/null +++ b/java/src/AcceptStrategy.java @@ -0,0 +1,89 @@ +/************************************************* + * + * = PACKAGE + * ACE.Connection + * + * = FILENAME + * AcceptStrategy.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Connection; + +import java.io.*; +import java.net.*; +import ACE.SOCK_SAP.*; + +/** + * <hr> + * <h2>SYNOPSIS</h2> + *<blockquote> + * Interface for specifying a passive connection + * acceptance strategy for a + * <a href="ACE.Connection.SvcHandler.html"><tt>SvcHandler</tt></a> + * . + *</blockquote> + * + * <h2>DESCRIPTION</h2> + * + *<blockquote> + * This class provides a strategy that manages passive + * connection setup for an application, and can be extended + * to define new strategies. + *</blockquote> + * + * @see SvcHandler + * @see Acceptor + */ + +public class AcceptStrategy +{ + /** + * Create an instance of Accept Strategy. + *@param port port number where the server will listen for connections + *@exception IOException couldn't open port + */ + AcceptStrategy (int port) throws IOException + { + this.open (port); + } + + /** + * Initialize AcceptStrategy. + *@param port port number where the server will listen for connections + *@exception IOException couldn't open port + */ + public void open (int port) throws IOException + { + // Create a new SOCK_Acceptor to accept client connections + this.sockAcceptor_ = new SOCKAcceptor (port); + } + + /** + * Accept connections into the SvcHandler. Note that subclasses + * should overwrite this method to provide a different accept + * strategy. + *@param sh Svc Handler in which to accept the connection + *@exception SocketException + *@exception IOException + *@return 0 + */ + public int acceptSvcHandler (SvcHandler sh) throws + SocketException, IOException + { + // Create a new stream + SOCKStream sockStream = new SOCKStream (); + + // Block in accept. Returns when a connection shows up + this.sockAcceptor_.accept (sockStream); + + // Set the streams for the new handler + sh.setHandle (sockStream); + return 0; + } + + // Our connection acceptance factory + private SOCKAcceptor sockAcceptor_; + +} diff --git a/java/src/Acceptor.java b/java/src/Acceptor.java new file mode 100644 index 00000000000..b47b0dcf875 --- /dev/null +++ b/java/src/Acceptor.java @@ -0,0 +1,213 @@ +/************************************************* + * + * = PACKAGE + * ACE.Connection + * + * = FILENAME + * Acceptor.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Connection; + +import java.io.*; +import java.net.*; +import ACE.OS.*; +import ACE.SOCK_SAP.*; +import ACE.ServiceConfigurator.*; + +/** + * <hr> + * <p><h2>SYNOPSIS</h2> + * + * <blockquote>Abstract factory for creating a service handler + * (<a href="ACE.Connection.SvcHandler.html"><tt>SvcHandler</tt></a>), + * accepting into the + * <a href="ACE.Connection.SvcHandler.html"><tt>SvcHandler</tt></a>, and activating the + * <a href="ACE.Connection.SvcHandler.html"><tt>SvcHandler</tt></a>.</blockquote> + * + * <p><h2>DESCRIPTION</h2> + * + * <blockquote>Implements the basic strategy for passively establishing + * connections with applications. The <tt>Acceptor</tt> + * is a factory for <tt>SvcHandler</tt> instances, and, by default + * generates a new <tt>SvcHandler</tt> instance for each connection + * esablished.</blockquote> + * + * <p> + * + * <blockquote> The user of this class <em>must</em> provide a + * reference to a handler factory prior to calling <a + * href="#accept()"><tt>accept</tt></a>, or an exception will be + * thrown. The handler factory is identified by the meta-class for + * the <tt>SvcHandler</tt>, and is typically obtained by calling <a + * href="java.lang.Class#classForName(java.lang.String)"><tt>Class.classForName("SvcHandler")</tt></a>. + * </blockquote> + * + * <p> + * + * <blockquote> TCP is the transport mechanism used, via + * <a href="ACE.SOCK_SAP.SOCKAcceptor.html#_top_"><tt>SOCKAcceptor</tt></a>, + * <em>et.al.</em> The SvcHandler is instantiated with a concrete type + * that performs the application-specific service. </blockquote> + * + * <h2>NOTES</h2> + * + * <blockquote> This class is not directly related to the + * <tt>AcceptorStrategy</tt> class.</blockquote> + * + * + * @see java.lang.Class,ACE.Connection.SvcHandler,ACE.SOCK_SAP.SOCKAcceptor */ +public class Acceptor extends ServiceObject +{ + /** + * Create an instance of Acceptor. Default constructor. Note that if + * an instance is created via this method, <tt>setHandlerFactory</tt> + * must be called prior to using <tt>accept</tt>. + * + * @see ACE.Connection.Acceptor.setHandlerFactory + */ + public Acceptor () + { + } + + /** + * Create an instance of Acceptor. + *@param handlerFactory meta-class reference used to create + * an instance of a SvcHandler when a connection is accepted + * (typically obtained by calling <tt>Class.classForName</tt>). + * + *@see java.lang.Class.classForName + */ + public Acceptor (Class handlerFactory) + { + this.handlerFactory_ = handlerFactory; + } + + /** + * Set the handler factory. This is provided to aid the default + * no-arg constructor. + *@param handlerFactory meta-class reference used to create + * an instance of a SvcHandler when a connection is accepted + * (typically obtained by calling <tt>Class.classForName</tt>). + * + *@see java.lang.Class.classForName + */ + public void setHandlerFactory (Class handlerFactory) + { + this.handlerFactory_ = handlerFactory; + } + + /** + * Initialize the Acceptor. + *@param port TCP port number where the Acceptor will listen for connections + *@exception IOException socket level exception + */ + public void open (int port) throws IOException + { + this.sockAcceptor_ = new SOCKAcceptor (port); + } + + /** + * Template method for accepting connections. Delegates operational + * activities to the following bridge methods: + * <ul> + * <li><tt>makeSvcHandler</tt></li> + * <li><tt>acceptSvcHandler</tt></li> + * <li><tt>activateSvcHandler</tt></li> + * </ul> + * + * <p> + * + * The method first obtains a <tt>SvcHandler</tt> via + * <tt>makeSvcHandler</tt>, accepts the connection <q>into</q> the + * handler using <tt>acceptSvcHandler</tt>, and finally turns over + * control to the handler with <tt>activateSvcHandler</tt>. + * + *@exception SocketException socket level error + *@exception InstantiationException <tt>makeSvcHandler</tt> failure + *@exception IllegalAccessException <tt>makeSvcHandler</tt> failure + *@exception IOException socket level error + */ + public void accept () throws SocketException, + InstantiationException, + IllegalAccessException, + IOException + { + + // Create a Svc_Handler using the appropriate Creation_Strategy + SvcHandler sh = this.makeSvcHandler (); + + // Accept a connection into the SvcHandler using the appropriate + // Accept_Strategy + this.acceptSvcHandler (sh); + + // Activate the SvcHandler using the appropriate ActivationStrategy + this.activateSvcHandler (sh); + } + + /** + * Bridge method for creating a <tt>SvcHandler</tt>. The default is to + * create a new <SvcHandler>. However, subclasses can override this + * policy to perform <SvcHandler> creation in any way that they like + * (such as creating subclass instances of <SvcHandler>, using a + * singleton, etc.) + *@return a new instance of the SvcHandler + *@exception InstantiationException could not create new SvcHandler + *@exception IllegalAccessException no SvcHandler factory provided + */ + protected SvcHandler makeSvcHandler () + throws InstantiationException, IllegalAccessException + { + // Create a new handler for the connection + return (SvcHandler) handlerFactory_.newInstance (); + } + + /** + * Bridge method for accepting the new connection into the + * <tt>SvcHandler</tt>. The default behavior delegates the work to + * <tt>SOCKAcceptor.accept</tt>. However, subclasses can override this + * strategy. + *@param sh SvcHandler in which to accept the connection + *@return 0 + *@exception SocketException socket level error + *@exception IOException socket level error + */ + protected int acceptSvcHandler (SvcHandler sh) + throws SocketException, IOException + { + // Create a new stream + SOCKStream sockStream = new SOCKStream (); + + // Block in accept. Returns when a connection shows up + this.sockAcceptor_.accept (sockStream); + + // Set the streams for the new handler + sh.setHandle (sockStream); + return 0; + } + + /** + * Bridge method for activating a <tt>SvcHandler</tt>. The default + * behavior of this method is to activate the <tt>SvcHandler</tt> by + * calling its open() method (which allows the <tt>SvcHandler</tt> to + * define its own concurrency strategy). However, subclasses can + * override this strategy to do more sophisticated concurrency + * activations. + *@param sh SvcHandler to activate + *@return 0 + */ + protected int activateSvcHandler (SvcHandler sh) + { + sh.open (null); + return 0; + } + + // Handler class that should be instantiated when a connection is + // made with a client + private Class handlerFactory_; + + // Our connection acceptance factory + private SOCKAcceptor sockAcceptor_; +} diff --git a/java/src/ActivateStrategy.java b/java/src/ActivateStrategy.java new file mode 100644 index 00000000000..9cff685f3ba --- /dev/null +++ b/java/src/ActivateStrategy.java @@ -0,0 +1,43 @@ +/************************************************* + * + * = PACKAGE + * ACE.Connection + * + * = FILENAME + * ActivateStrategy.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Connection; + + +/** + * <hr> + *<h2>SYNOPSIS</h2> + * Bridge supporting activation strategy used by + * <a href="ACE.Connection.StrategyAcceptor.html#_top_"><tt>StrategyAcceptor</tt></a> + * + *<h2>DESCRIPTION</h2> + * Subclass and overload + * <a href="#activateSvcHandler(ACE.Connection.SvcHandler)"><tt>activateSvcHandler</tt></a> + * in order change the activation strategy. Then, submit this subclass to + * <a href="ACE.Connection.StrategyAcceptor.html#_top_"><tt>StrategyAcceptor</tt></a> + * as the activation strategy. + * + *@see StrategyAcceptor + */ +public class ActivateStrategy +{ + /** + * Activate the Svc Handler. Note that subclasses should overwrite + * this method to provide a different Activate strategy. + *@param sh Svc Handler to activate + *@return zero if success, non-zero for failure + */ + public int activateSvcHandler (SvcHandler sh) + { + sh.open (null); + return 0; + } +} diff --git a/java/src/Connector.java b/java/src/Connector.java new file mode 100644 index 00000000000..337d360bede --- /dev/null +++ b/java/src/Connector.java @@ -0,0 +1,149 @@ +/************************************************* + * + * = PACKAGE + * ACE.Connection + * + * = FILENAME + * Connector.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Connection; + +import java.io.*; +import java.net.*; +import ACE.OS.*; +import ACE.SOCK_SAP.*; +import ACE.ServiceConfigurator.*; + +/** + * <hr> + * <h2>SYNOPSIS</h2> + *<blockquote> + * Abstract factory for connecting a + * (<a href="ACE.Connection.SvcHandler.html"><tt>SvcHandler</tt></a>), + * to an application. + *</blockquote> + * + * <h2>DESCRIPTION</h2> + *<blockquote> + * Implements the basic strategy for actively establishing connections + * with applications. The <tt>Connector</tt> establishes the connection, + * passing it on to a <tt>SvcHandler</tt> instance, and handing over + * control to that instance. + *<p> + * TCP is the transport mechanism used, via + * <a href="ACE.SOCK_SAP.SOCKConnector.html#_top_"><tt>SOCKConnector</tt></a>. + *</blockquote> + * + *<h2>NOTES</h2> + *<blockquote> + * This class, as currently implemented, does not work like its C++ counterpart. + * Future versions are expected to rectify this discrepancy. + *</blockquote> + * + *@see SOCKConnector,SvcHandler + */ +public class Connector extends ServiceObject +{ + /** + * Create a Connector. Do nothing constructor. Allows user to + * call <a href="#open(java.lang.String)">open</a>() later. + */ + public Connector () + { + } + + /** + * Create a Connector passing in server hostname and port + * number, effectively shorthand for calling + * <a href="#open(java.lang.String)">open</a>(). + *@param hostname server hostname + *@param port server port number + */ + public Connector (String hostname, int port) + { + this.open (hostname, port); + } + + /** + * Initialize the Connector passing in server hostname and port + * number. Note that no connection attempt is made. + *@param hostname server hostname + *@param port server port number + */ + public void open (String hostname, int port) + { + this.hostname_ = hostname; + this.port_ = port; + } + + /** + * Connect to the server. + *@param sh Svc Handler to use to handle the connection + */ + public void connect (SvcHandler sh) throws UnknownHostException, + SocketException, + InstantiationException, + IllegalAccessException, + IOException + { + // Make a connection using the appropriate Connection_Strategy + this.connectSvcHandler (sh); + + // Activate the Svc_Handler using the appropriate Activation_Strategy + this.activateSvcHandler (sh); + } + + /** + * Bridge method for making a new connection. The default behavior + * creates a new SOCKConnector and then calls setHandle() on the + * <SvcHandler> that was passed in. Subclasses can override this + * strategy, if needed. + *@param sh Svc Handler to use to handle the connection + *@return 0 + */ + protected int connectSvcHandler (SvcHandler sh) throws + SocketException, IOException + { + // Create a new stream + SOCKStream sockStream = new SOCKStream (); + + // Create a SOCK_Connector (note the constructor does the connect for us) + this.sockConnector_ = new SOCKConnector (sockStream, + this.hostname_, + this.port_); + ACE.DEBUG ("Connected to " + + sockStream.socket ().getInetAddress ()); + + // Set the streams for the new handler + sh.setHandle (sockStream); + return 0; + } + + /** + * Bridge method for activating a <SvcHandler>. The default + * behavior of this method is to activate the <SvcHandler> by + * calling its open() method (which allows the SVC_HANDLER to define + * its own concurrency strategy). However, subclasses can override + * this strategy to do more sophisticated concurrency activations. + *@param sh Svc Handler to activate + *@return 0 + */ + protected int activateSvcHandler (SvcHandler sh) + { + sh.open (null); + return 0; + } + + + // Port server is listening on + private int port_; + + // Server hostname + private String hostname_; + + // Our connection factory + private SOCKConnector sockConnector_; +} diff --git a/java/src/CreationStrategy.java b/java/src/CreationStrategy.java new file mode 100644 index 00000000000..787e0c22c24 --- /dev/null +++ b/java/src/CreationStrategy.java @@ -0,0 +1,61 @@ +/************************************************* + * + * = PACKAGE + * ACE.Connection + * + * = FILENAME + * CreationStrategy.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Connection; + +/** + * <hr> + * <h2>SYNOPSIS</h2> + *<blockquote> + * Defines the interface for specifying a creation strategy for a + * <a href="ACE.Connection.SvcHandler.html#_top_"><tt>SvcHandler</tt></a> to the + * <a href="ACE.Connection.StrategyAcceptor.html#_top_"><tt>StrategyAcceptor</tt></a>. + *</blockquote> + * + * <p><b>DESCRIPTION</b><br> + *<blockquote> + * The default behavior is to make a new SvcHandler. However, + * subclasses can override this strategy to perform SvcHandler + * creation in any way that they like (such as creating subclass + * instances of SvcHandler, using a singleton, dynamically + * linking the handler, etc.). + *</blockquote> + * + *@see SvcHandler,StrategyAcceptor,AcceptStrategy,ActivateStrategy + */ +public class CreationStrategy +{ + /** + * Create an instance of Creation Strategy. + *@param handlerFactory Svc Handler factory that is used to create + * an instance of a Svc Handler + */ + public CreationStrategy (Class handlerFactory) + { + this.handlerFactory_ = handlerFactory; + } + + /** + * Create a new SvcHandler. Note that subclasses should override + * this method to provide a new creation strategy. + *@return reference to a new instance of the SvcHandler (or subclass) + *@exception InstantiationException Unable to instantiate. + *@exception IllegalAccessException No handler factory available. + */ + public SvcHandler makeSvcHandler () throws InstantiationException, + IllegalAccessException + { + // Create a new Svc_Handler + return (SvcHandler) handlerFactory_.newInstance (); + } + + private Class handlerFactory_; +} diff --git a/java/src/EventHandler.java b/java/src/EventHandler.java new file mode 100644 index 00000000000..6b40618e0c3 --- /dev/null +++ b/java/src/EventHandler.java @@ -0,0 +1,50 @@ +/************************************************* + * + * = PACKAGE + * ACE.Reactor + * + * = FILENAME + * EventHandler.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Reactor; + +/** + * <hr> + * <h2>SYNOPSIS</h2> + *<blockquote> + * Provides an abstract interface for handling timer events. + *</blockquote> + * + * <h2>DESCRIPTION</h2> + *<blockquote> + * Classes implementing this interface handle a timer's + * expiration. + *</blockquote> + * + * <h2>NOTES</h2> + *<blockquote> + * Users of C++ ACE will notice that this defines a substantially + * smaller interface than the C++ counterpart. Signal events are + * absent due to the complete absence of this feature from Java itself. + * Moreover, at this point + * there is still some question regarding whether or not the I/O + * portion will make any sense or fit into the Java model for I/O. + *</blockquote> + * + *@see TimerQueue,Reactor + */ +public interface EventHandler +{ + /** + * Called when timer expires. + *@param tv Time Value for which timer was set + *@param obj An arbitrary object that was passed to the Timer Queue + * (Asynchronous Completion Token) + */ + public int handleTimeout (TimeValue tv, Object obj); +} + +// Note that more methods will be added as needed diff --git a/java/src/GetOpt.java b/java/src/GetOpt.java new file mode 100644 index 00000000000..5060d7f8b94 --- /dev/null +++ b/java/src/GetOpt.java @@ -0,0 +1,150 @@ +/************************************************* + * + * = PACKAGE + * ACE.Misc + * + * = FILENAME + * GetOpt.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Misc; + +import java.io.*; +import java.util.Hashtable; +import java.util.StringTokenizer; + +/** + * <hr> + * <h2>SYNOPSIS</h2> + *<blockquote> + * Iterator for parsing command-line arguments. + *</blockquote> + * + * <h2>DESCRIPTION</h2> + *<blockquote> + * This version of `get_opt' appears to the caller like standard + * Unix `get_opt' but it behaves differently for the user, since + * it allows the user to intersperse the options with the other + * arguments. + * + * <p> As `get_opt' works, it permutes the elements of `argv' so that, + * when it is done, all the options precede everything else. Thus + * all application programs are extended to handle flexible argument + * order. + *</blockquote> + * + */ +public class GetOpt +{ + /** + * Constructor + *@param args command line arguments + *@param optstring string containing the legitimate option + * characters. A colon in optstring means that the previous character + * is an option that wants an argument which is then taken from the + * rest of the current args-element. Here is an example of what + * optstring might look like: "c:dP:p". + */ + public GetOpt (String[] args, String optstring) + { + // Cache the arguments + this.args_ = args; + this.hasArg_ = false; + + // Build the arg hashtable + this.buildArgTable (optstring); + } + + /** + * Scan elements specified in optstring for next option flag. + *@return The character corresponding to the next flag. + */ + public int next () + { + if (this.args_ == null) + return -1; + + if (this.index_ < this.args_.length) + { + String arg = this.args_[this.index_++]; + + // Make sure flag starts with "-" + if (!arg.startsWith ("-")) + return -1; + + // Check if there is more than one character specified as flag + if (arg.length () > 2) + return -1; + + // So far so good + // Check if the flag is in the arg_table and if it is get the + // associated binding. + Character c = (Character) this.argTable_.get (new Character (arg.charAt (1))); + if (c == null) + return -1; + + if (c.charValue () == '#') + { + this.hasArg_ = false; + return arg.charAt (1); + } + else if (c.charValue () == ':') + { + this.hasArg_ = true; + return arg.charAt (1); + } + else // This should not happen + return -1; + } + return -1; + } + + /** + * Get the argument (if any) associated with the flag. + *@return the argument associated with the flag. + */ + public String optarg () + { + if (this.hasArg_) + return this.args_[this.index_++]; + else + return null; + } + + // Build the argument table + private void buildArgTable (String s) + { + this.argTable_ = new Hashtable (); + StringTokenizer tokens = new StringTokenizer (s, ":"); + while (tokens.hasMoreTokens ()) + { + // Get the next token + String t = tokens.nextToken (); + + // First add all flags except the one with ":" after it + // Note "#" is an arbitrary character we use to distinguish + // the two cases + for (int i = 0; i < t.length () - 1; i++) + this.argTable_.put (new Character (t.charAt (i)), + new Character ('#')); + + // Now Add the flag just before ":" to the arg_table + this.argTable_.put (new Character (t.charAt (t.length () - 1)), + new Character (':')); + } + } + + private String [] args_; + // Copy of the args passed in + + private boolean hasArg_; + // Indicator that the flag has an argument following it + + private int index_; + // Index into the array of arguments + + private Hashtable argTable_; + // Table of flags that take arguments after them +} diff --git a/java/src/Makefile b/java/src/Makefile new file mode 100644 index 00000000000..cb2d14cc454 --- /dev/null +++ b/java/src/Makefile @@ -0,0 +1,66 @@ +# Makefile + +.SUFFIXES: .java .class + +JACE_WRAPPER = .. +CLASSDIR = $(JACE_WRAPPER)/classes +DOCDIR = $(JACE_WRAPPER)/doc + +CLASSPATH := $(CLASSDIR):$(CLASSPATH) + +all: + javac -d ${JACE_WRAPPER}/classes $(files) +doc: + javadoc -d ${JACE_WRAPPER}/doc $(files) $(packages) + +files = ACE.java \ + OS.java \ + TimeValue.java \ + EventHandler.java \ + ProfileTimer.java \ + TimerQueue.java \ + GetOpt.java \ + MessageType.java \ + MessageBlock.java \ + MessageQueue.java \ + ThreadManager.java \ + Semaphore.java \ + Mutex.java \ + RWMutex.java \ + TaskFlags.java \ + Task.java \ + SOCKStream.java \ + SOCKAcceptor.java \ + SOCKConnector.java \ + SvcHandler.java \ + CreationStrategy.java \ + AcceptStrategy.java \ + ActivateStrategy.java \ + Acceptor.java \ + StrategyAcceptor.java \ + Connector.java \ + ServiceRepository.java \ + ServiceObject.java \ + ServiceConfig.java \ + TimeoutException.java \ + TimedWait.java + +packages = ACE \ + ACE.ASX \ + ACE.Connection \ + ACE.Concurrency \ + ACE.Misc \ + ACE.OS \ + ACE.Reactor \ + ACE.SOCK_SAP \ + ACE.Service_Configurator \ + ACE.Timers + + +clean: + find ${JACE_WRAPPER}/classes/ACE -name '*.class' -print | xargs ${RM} + +docclean: + find ${JACE_WRAPPER}/doc -name '*.html' -print | xargs ${RM} + +realclean: clean docclean diff --git a/java/src/MessageBlock.java b/java/src/MessageBlock.java new file mode 100644 index 00000000000..6f81f279291 --- /dev/null +++ b/java/src/MessageBlock.java @@ -0,0 +1,453 @@ +/************************************************* + * + * = PACKAGE + * ACE.ASX + * + * = FILENAME + * MessageBlock.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.ASX; + +import ACE.OS.*; + +/** + * <hr> + * <h2>SYNOPSIS</h2> + *<blockquote> + * Object used to store messages in the ASX framework. + *</blockquote> + * + * <h2>DESCRIPTION</h2> + *<blockquote> + * <tt>MessageBlock</tt> is modeled after the message data structures + * used in System V STREAMS. A <tt>MessageBlock</tt> is composed of + * one or more <tt>MessageBlock</tt>s that are linked together by <em>PREV</em> + * and <em>NEXT</em> pointers. In addition, a <tt>MessageBlock</tt> may also be + * linked to a chain of other <tt>MessageBlock</tt>s. This structure + * enables efficient manipulation of arbitrarily-large messages + * <em>without</em> incurring memory copying overhead. + *</blockquote> + * + *@see MessageQueue + */ +public class MessageBlock +{ + /** + * Create an empty Message Block + */ + public MessageBlock () + { + this (0); + } + + /** + * Create an empty Message Block. + * Note that this assumes that type of MessageBlock is MB_DATA. + *@param size size of the Message Block to create. + */ + public MessageBlock (int size) + { + // Note the explicit cast toString() is needed. For some strange + // reason, it fails otherwise if size == 0. + this ((new StringBuffer (size)).toString ()); + } + + /** + * Create a Message Block. Note that this assumes that type of + * MessageBlock is MB_DATA. + *@param data initial data to create a Message Block with. + */ + public MessageBlock (String data) + { + this (MessageType.MB_DATA, + null, + data); + } + + /** + * Create a Message Block. + *@param type type of the Message Block (must be one of those + * specified in class Message Type) + *@param cont next block of data + *@param data initial data to create Message Block with + */ + public MessageBlock (int type, + MessageBlock cont, + String data) + { + this.flags_ = 0; + this.priority_ = 0; + this.next_ = null; + this.prev_ = null; + + this.init (type, cont, data); + } + + /** + * Create a Message Block. Note that this assumes that type of + * MessageBlock is MB_OBJECT. + *@param obj initial object to create a Message Block with. + */ + public MessageBlock (Object obj) + { + this (MessageType.MB_OBJECT, + null, + obj); + } + + /** + * Create a Message Block. + *@param type type of the Message Block (must be one of those + * specified in class Message Type) + *@param cont next block of data + *@param obj initial object to create Message Block with + */ + public MessageBlock (int type, + MessageBlock cont, + Object obj) + { + this.init (type, cont, obj); + } + + /* Initialize the Message Block + *@param data data to initialize Message Block with + */ + public void init (String data) + { + this.base_ = new StringBuffer (data); + } + + /** + * Initialize a Message Block. + *@param type type of the Message Block (must be one of those + * specified in class Message Type) + *@param cont next block of data + *@param data data to initialize Message Block with + */ + public void init (int msgType, + MessageBlock msgCont, + String data) + { + if (data.length () == 0) + this.base_ = new StringBuffer (0); + else + this.base_ = new StringBuffer (data); + this.type_ = msgType; + this.cont_ = msgCont; + } + + /** + * Initialize a Message Block. Note that this assumes that type of + * MessageBlock is MB_OBJECT. + *@param obj initial object to initialize a Message Block with. + */ + public void init (Object obj) + { + this.init (MessageType.MB_OBJECT, null, obj); + } + + /** + * Initialize a Message Block. + *@param type type of the Message Block (must be one of those + * specified in class Message Type) + *@param cont next block of data + *@param obj object to initialize Message Block with + */ + public void init (int msgType, + MessageBlock msgCont, + Object obj) + { + this.obj_ = obj; + this.type_ = msgType; + this.cont_ = msgCont; + this.flags_ = 0; + this.priority_ = 0; + this.next_ = null; + this.prev_ = null; + } + + /** + * Set message flags. Note that the flags will be set on top of + * already set flags. + *@param moreFlags flags to set for the Message Block. + */ + public long setFlags (long moreFlags) + { + // Later we might mask more_flags so that user can't change + // internal ones: more_flags &= ~(USER_FLAGS -1). + this.flags_ = ACE.SET_BITS (this.flags_, moreFlags); + return this.flags_; + } + + /** + * Unset message flags. + *@param lessFlags flags to unset for the Message Block. + */ + public long clrFlags (long lessFlags) + { + // Later we might mask more_flags so that user can't change + // internal ones: less_flags &= ~(USER_FLAGS -1). + this.flags_ = ACE.CLR_BITS (this.flags_, lessFlags); + return this.flags_; + } + + /** + * Get the message flags. + *@return Message flags + */ + public long flags () + { + return this.flags_; + } + + /** + * Get the type of the message. + *@return message type + */ + public int msgType () + { + return this.type_; + } + + /** + * Set the type of the message. + *@param t type of the message + */ + public void msgType (int t) + { + this.type_ = t; + } + + /** + * Get the class of the message. Note there are two classes, + * <normal> messages and <high-priority> messages. + *@return message class + */ + public int msgClass () + { + return this.msgType () >= MessageType.MB_PRIORITY + ? MessageType.MB_PRIORITY : MessageType.MB_NORMAL; + } + + /** + * Find out if the message is a data message. + *@return true if message is a data message, false otherwise + */ + public boolean isDataMsg () + { + int mt = this.msgType (); + return mt == MessageType.MB_DATA + || mt == MessageType.MB_PROTO + || mt == MessageType.MB_PCPROTO; + } + + /** + * Find out if the message is an object message. + *@return true if message is an object message, false otherwise + */ + public boolean isObjMsg () + { + int mt = this.msgType (); + return mt == MessageType.MB_OBJECT + || mt == MessageType.MB_PROTO + || mt == MessageType.MB_PCPROTO; + } + + /** + * Get the priority of the message. + *@return message priority + */ + public long msgPriority () + { + return this.priority_; + } + + /** + * Set the priority of the message. + *@param pri priority of the message + */ + public void msgPriority (long pri) + { + this.priority_ = pri; + } + + /** + * Get message data. This assumes that msgType is MB_DATA. + *@return message data + */ + public String base () + { + // Create a String object to return + char temp[] = new char [this.base_.length ()]; + this.base_.getChars (0, this.base_.length (), temp, 0); + return new String (temp); + } + + /** + * Set the message data. This assumes that msgType is MB_DATA. + *@param data message data + *@param msgFlags message flags + */ + public void base (String data, + long msgFlags) + { + this.base_ = new StringBuffer (data); + this.flags_ = msgFlags; + } + + /** + * Get message object. This assumes that msgType is MB_OBJECT. + *@return message object + */ + public Object obj () + { + return this.obj_; + } + + /** + * Set the message object. This assumes that msgType is MB_OBJECT. + *@param object message object + *@param msgFlags message flags + */ + public void obj (Object obj, + long msgFlags) + { + this.obj_ = obj; + this.flags_ = msgFlags; + } + + // = The following four methods only make sense if the Message_Block + // is of type MB_DATA and not MB_OBJECT. + + /** + * Get length of the message. This method only makes sense if the + * MessageBlock is of type MB_DATA and not MB_OBJECT. + *@return length of the message. + */ + public int length () + { + return this.base_.length (); + } + + /** + * Set the length of the message. This method only makes sense if the + * MessageBlock is of type MB_DATA and not MB_OBJECT. + *@param n message length + */ + public void length (int n) + { + this.base_.setLength (n); + } + + /** + * Get size of the allocated buffer for the message. This method + * only makes sense if the MessageBlock is of type MB_DATA and not + * MB_OBJECT. + *@return size of the message buffer + */ + public int size () + { + return this.base_.capacity (); + } + + /** + * Set the total size of the buffer. This method will grow the + * buffer if need be. Also, this method only makes sense if the + * MessageBlock is of type MB_DATA and not MB_OBJECT. + *@param n size of message buffer + */ + public void size (int n) + { + this.base_.ensureCapacity (n); + } + + + /** + * Get the continuation field. The coninuation field is used to + * chain together composite messages. + *@return the continuation field + */ + public MessageBlock cont () + { + return this.cont_; + } + + /** + * Set the continuation field. The coninuation field is used to + * chain together composite messages. + *@param msgCont continuation field + */ + void cont (MessageBlock msgCont) + { + this.cont_ = msgCont; + } + + /** + * Get link to next message. The next message points to the + * <MessageBlock> directly ahead in the MessageQueue. + *@return next message block + */ + MessageBlock next () + { + return this.next_; + } + + /** + * Set link to next message. The next message points to the + * <MessageBlock> directly ahead in the MessageQueue. + *@param msgBlock next message block + */ + void next (MessageBlock msgBlock) + { + this.next_ = msgBlock; + } + + /** + * Get link to previous message. The previous message points to the + * <MessageBlock> directly before in the MessageQueue. + *@return previous message block + */ + MessageBlock prev () + { + return this.prev_; + } + + /** + * Set link to previous message. The previous message points to the + * <MessageBlock> directly before in the MessageQueue. + *@param msgBlock previous message block + */ + void prev (MessageBlock msgBlock) + { + this.prev_ = msgBlock; + } + + private int type_; + // Type of message. + + private long flags_; + // Misc flags. + + private long priority_; + // Priority of message. + + private StringBuffer base_; + // String data of message block (initialized to null). + + private Object obj_; + // Object data of message block (initialized to null). + + private MessageBlock cont_; + // Next message block in the chain. + + private MessageBlock next_; + // Next message in the list. + + private MessageBlock prev_; + // Previous message in the list. + +} + diff --git a/java/src/MessageQueue.java b/java/src/MessageQueue.java new file mode 100644 index 00000000000..b13b4fdbf8d --- /dev/null +++ b/java/src/MessageQueue.java @@ -0,0 +1,614 @@ +/************************************************* + * + * = PACKAGE + * ACE.ASX + * + * = FILENAME + * MessageQueue.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.ASX; + +import java.util.Date; +import ACE.OS.*; +import ACE.Reactor.*; + +class NotFullCondition extends TimedWait +{ + public NotFullCondition (MessageQueue mq) + { + super (mq); + this.mq_ = mq; + } + + public boolean condition () { + // Delegate to the appropriate conditional + // check on the MessageQueue. + return !this.mq_.isFull (); + } + private MessageQueue mq_; +} + +class NotEmptyCondition extends TimedWait +{ + public NotEmptyCondition (MessageQueue mq) + { + super (mq); + this.mq_ = mq; + } + + public boolean condition () { + // Delegate to the appropriate conditional + // check on the MessageQueue. + return !this.mq_.isEmpty (); + } + private MessageQueue mq_; +} + + +/** + * <hr> + * <h2>SYNOPSIS</h2> + *<blockquote> + * A thread-safe message queueing facility, modeled after the + * queueing facilities in System V StreamS. + *</blockquote> + * + * <h2>DESCRIPTION</h2> + *</blockquote> + * <tt>MessageQueue</tt> is the central queueing facility for messages + * in the ASX framework. All operations are thread-safe, as it is intended + * to be used for inter-thread communication (<em>e.g.</em>, a producer and + * consumer thread joined by a <tt>MessageQueue</tt>). The queue + * consiste of <tt>MessageBlock</tt>s. + *</blockquote> + * + *@see MessageBlock,TimeValue + */ +public class MessageQueue +{ + /** + * Default constructor + */ + public MessageQueue () + { + this (DEFAULT_HWM, DEFAULT_LWM); + } + + /** + * Create a Message Queue with high and low water marks. + *@param hwm High water mark (max number of bytes allowed in the + * queue) + *@param lwm Low water mark (min number of bytes in the queue) + */ + public MessageQueue (int hwm, int lwm) + { + if (this.open (hwm, lwm) == -1) + ACE.ERROR ("open"); + } + + /** + * Initialize a Message Queue with high and low water marks. + *@param hwm High water mark (max number of bytes allowed in the + * queue) + *@param lwm Low water mark (min number of bytes in the queue) + */ + public synchronized int open (int hwm, int lwm) + { + this.highWaterMark_ = hwm; + this.lowWaterMark_ = lwm; + this.deactivated_ = false; + this.currentBytes_ = 0; + this.currentCount_ = 0; + this.tail_ = null; + this.head_ = null; + return 0; + } + + // = For enqueue, enqueueHead, enqueueTail, and dequeueHead if + // timeout is specified, the caller will wait for amount of time in + // tv. Calls will return, however, when queue is closed, + // deactivated, or if the time specified in tv elapses. + + /** + * Enqueue a <MessageBlock> into the <MessageQueue> in accordance + * with its <msgPriority> (0 is lowest priority). Note that the + * call will block (unless the queue has been deactivated). + *@param newItem item to enqueue onto the Message Queue + *@return -1 on failure, else the number of items still on the queue. + */ + public synchronized int enqueue (MessageBlock newItem) throws InterruptedException + { + return this.enqueue (newItem, null); + } + + /** + * Enqueue a <MessageBlock> into the <MessageQueue> in accordance + * with its <msgPriority> (0 is lowest priority). Note that the + * call will return if <timeout> amount of time expires or if the + * queue has been deactivated. + *@param newItem item to enqueue onto the Message Queue + *@param tv amount of time (TimeValue) to wait before returning + * (unless operation completes before) + *@return -1 on failure, else the number of items still on the + * queue. + */ + public synchronized int enqueue (MessageBlock newItem, + TimeValue tv) throws InterruptedException + { + int result = -1; + if (this.deactivated_) + return -1; + try + { + if (tv == null) // Need to do a blocking wait + notFullCondition_.timedWait (); + else // Need to do a timed wait + notFullCondition_.timedWait (tv); + } + catch (TimeoutException e) + { + return -1; + } + + // Check again if queue is still active + if (this.deactivated_) + return -1; + else + result = this.enqueueInternal (newItem); + + // Tell any blocked threads that the queue has a new item! + this.notEmptyCondition_.broadcast (); + return result; + } + + /** + * Enqueue a <MessageBlock> at the end of the <MessageQueue>. Note + * that the call will block (unless the queue has been deactivated). + *@param newItem item to enqueue onto the Message Queue + *@return -1 on failure, else the number of items still on the queue. + */ + public synchronized int enqueueTail (MessageBlock newItem) throws InterruptedException + { + return this.enqueueTail (newItem, null); + } + + /** + * Enqueue a <MessageBlock> at the end of the <MessageQueue>. Note + * that the call will return if <timeout> amount of time expires or + * if the queue has been deactivated. + *@param newItem item to enqueue onto the Message Queue + *@param tv amount of time (TimeValue) to wait before returning + * (unless operation completes before) + *@return -1 on failure, else the number of items still on the queue. + */ + public synchronized int enqueueTail (MessageBlock newItem, + TimeValue tv) throws InterruptedException + { + int result = -1; + if (this.deactivated_) + return -1; + try + { + if (tv == null) // Need to do a blocking wait + notFullCondition_.timedWait (); + else // Need to do a timed wait + notFullCondition_.timedWait (tv); + } + catch (TimeoutException e) + { + return -1; + } + + // Check again if queue is still active + if (this.deactivated_) + return -1; + else + result = this.enqueueTailInternal (newItem); + + // Tell any blocked threads that the queue has a new item! + this.notEmptyCondition_.broadcast (); + return result; + } + + /** + * Enqueue a <MessageBlock> at the head of the <MessageQueue>. Note + * that the call will block (unless the queue has been deactivated). + *@param newItem item to enqueue onto the Message Queue + *@return -1 on failure, else the number of items still on the queue. + */ + public synchronized int enqueueHead (MessageBlock newItem) throws InterruptedException + { + return this.enqueueHead (newItem, null); + } + + /** + * Enqueue a <MessageBlock> at the head of the <MessageQueue>. Note + * that the call will return if <timeout> amount of time expires or + * if the queue has been deactivated. + *@param newItem item to enqueue onto the Message Queue + *@param tv amount of time (TimeValue) to wait before returning + * (unless operation completes before) + *@return -1 on failure, else the number of items still on the queue. + */ + public synchronized int enqueueHead (MessageBlock newItem, + TimeValue tv) throws InterruptedException + { + int result = -1; + if (this.deactivated_) + return -1; + try + { + if (tv == null) // Need to do a blocking wait + notFullCondition_.timedWait (); + else // Need to do a timed wait + notFullCondition_.timedWait (tv); + } + catch (TimeoutException e) + { + return -1; + } + + // Check again if queue is still active + if (this.deactivated_) + return -1; + else + result = this.enqueueHeadInternal (newItem); + + // Tell any blocked threads that the queue has a new item! + this.notEmptyCondition_.broadcast (); + return result; + } + + /** + * Dequeue and return the <MessageBlock> at the head of the + * <MessageQueue>. Note that the call will block (unless the queue + * has been deactivated). + *@return null on failure, else the <MessageBlock> at the head of queue. + */ + public synchronized MessageBlock dequeueHead () throws InterruptedException + { + return this.dequeueHead (null); + } + + /** + * Dequeue and return the <MessageBlock> at the head of the + * <MessageQueue>. Note that the call will return if <timeout> + * amount of time expires or if the queue has been deactivated. + *@return null on failure, else the <MessageBlock> at the head of queue. + */ + public synchronized MessageBlock dequeueHead (TimeValue tv) throws InterruptedException + { + MessageBlock result = null; + if (this.deactivated_) + return null; + try + { + if (tv == null) // Need to do a blocking wait + notEmptyCondition_.timedWait (); + else // Need to do a timed wait + notEmptyCondition_.timedWait (tv); + } + catch (TimeoutException e) + { + return null; + } + + // Check again if queue is still active + if (this.deactivated_) + return null; + else + result = this.dequeueHeadInternal (); + + // Tell any blocked threads that the queue has room for an item! + this.notFullCondition_.broadcast (); + return result; + } + + /** + * Check if queue is full. + *@return true if queue is full, else false. + */ + public synchronized boolean isFull () + { + return this.isFullInternal (); + } + + /** + * Check if queue is empty. + *@return true if queue is empty, else false. + */ + public synchronized boolean isEmpty () + { + return this.isEmptyInternal (); + } + + /** + * Get total number of bytes on the queue. + *@return total number number of bytes on the queue + */ + public int messageBytes () + { + return this.currentBytes_; + } + + /** + * Get total number of messages on the queue. + *@return total number number of messages on the queue + */ + public int messageCount () + { + return this.currentCount_; + } + + // = Flow control routines + + /** + * Get high watermark. + *@return high watermark + */ + public int highWaterMark () + { + return this.highWaterMark_; + } + + /** + * Set high watermark. + *@param hwm high watermark + */ + public void highWaterMark (int hwm) + { + this.highWaterMark_ = hwm; + } + + /** + * Get low watermark. + *@return low watermark + */ + public int lowWaterMark () + { + return this.lowWaterMark_; + } + + /** + * Set low watermark. + *@param lwm low watermark + */ + public void lowWaterMark (int lwm) + { + this.lowWaterMark_ = lwm; + } + + // = Activation control methods. + + /** + * Deactivate the queue and wakeup all threads waiting on the queue + * so they can continue. No messages are removed from the queue, + * however. Any other operations called until the queue is + * activated again will immediately return -1. + *@return WAS_INACTIVE if queue was inactive before the call and + * WAS_ACTIVE if queue was active before the call. + */ + public synchronized int deactivate () + { + return this.deactivateInternal (); + } + + + /** + * Reactivate the queue so that threads can enqueue and dequeue + * messages again. + *@return WAS_INACTIVE if queue was inactive before the call and + * WAS_ACTIVE if queue was active before the call. + */ + public synchronized int activate () + { + return this.activateInternal (); + } + + protected boolean isEmptyInternal () + { + // Not sure about this one!!!! + return this.currentBytes_ <= this.lowWaterMark_ && this.currentCount_ <= 0; + } + + protected boolean isFullInternal () + { + return this.currentBytes_ > this.highWaterMark_; + } + + protected int deactivateInternal () + { + int currentStatus = + this.deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; + + this.notFullCondition_.broadcast (); + this.notEmptyCondition_.broadcast (); + + this.deactivated_ = true; + return currentStatus; + } + + protected int activateInternal () + { + int currentStatus = + this.deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; + this.deactivated_ = false; + + return currentStatus; + } + + protected int enqueueTailInternal (MessageBlock newItem) + { + if (newItem == null) + return -1; + + // List was empty, so build a new one. + if (this.tail_ == null) + { + this.head_ = newItem; + this.tail_ = newItem; + newItem.next (null); + newItem.prev (null); + } + // Link at the end. + else + { + newItem.next (null); + this.tail_.next (newItem); + newItem.prev (this.tail_); + this.tail_ = newItem; + } + + // Make sure to count *all* the bytes in a composite message!!! + for (MessageBlock temp = newItem; + temp != null; + temp = temp.cont ()) + this.currentBytes_ += temp.size (); + + this.currentCount_++; + return this.currentCount_; + } + + protected int enqueueHeadInternal (MessageBlock newItem) + { + if (newItem == null) + return -1; + + newItem.prev (null); + newItem.next (this.head_); + + if (this.head_ != null) + this.head_.prev (newItem); + else + this.tail_ = newItem; + + this.head_ = newItem; + + // Make sure to count *all* the bytes in a composite message!!! + for (MessageBlock temp = newItem; + temp != null; + temp = temp.cont ()) + this.currentBytes_ += temp.size (); + + this.currentCount_++; + + return this.currentCount_; + } + + protected int enqueueInternal (MessageBlock newItem) + { + if (newItem == null) + return -1; + + if (this.head_ == null) + // Check for simple case of an empty queue, where all we need to + // do is insert <newItem> into the head. + return this.enqueueHeadInternal (newItem); + else + { + MessageBlock temp; + + // Figure out where the new item goes relative to its priority. + + for (temp = this.head_; + temp != null; + temp = temp.next ()) + { + if (temp.msgPriority () <= newItem.msgPriority ()) + // Break out when we've located an item that has lower + // priority that <newItem>. + break; + } + + if (temp == null) + // Check for simple case of inserting at the end of the queue, + // where all we need to do is insert <newItem> after the + // current tail. + return this.enqueueTailInternal (newItem); + else if (temp.prev () == null) + // Check for simple case of inserting at the beginning of the + // queue, where all we need to do is insert <newItem> before + // the current head. + return this.enqueueHeadInternal (newItem); + else + { + // Insert the message right before the item of equal or lower + // priority. + newItem.next (temp); + newItem.prev (temp.prev ()); + temp.prev ().next (newItem); + temp.prev (newItem); + } + } + + // Make sure to count *all* the bytes in a composite message!!! + for (MessageBlock temp = newItem; + temp != null; + temp = temp.cont ()) + this.currentBytes_ += temp.size (); + + this.currentCount_++; + return this.currentCount_; + } + + protected MessageBlock dequeueHeadInternal () + { + MessageBlock firstItem = this.head_; + this.head_ = this.head_.next (); + + if (this.head_ == null) + this.tail_ = null; + + // Make sure to subtract off all of the bytes associated with this + // message. + for (MessageBlock temp = firstItem; + temp != null; + temp = temp.cont ()) + this.currentBytes_ -= temp.size (); + + this.currentCount_--; + return firstItem; + } + + + /** Default high watermark (16 K). */ + public final static int DEFAULT_HWM = 16 * 1024; + + /** Default low watermark. */ + public final static int DEFAULT_LWM = 0; + + /** Message queue was active before activate() or deactivate(). */ + public final static int WAS_ACTIVE = 1; + + /** Message queue was inactive before activate() or deactivate(). */ + public final static int WAS_INACTIVE = 2; + + private int highWaterMark_; + // Greatest number of bytes before blocking. + + private int lowWaterMark_; + // Lowest number of bytes before unblocking occurs. + + private boolean deactivated_; + // Indicates that the queue is inactive. + + private int currentBytes_; + // Current number of bytes in the queue. + + private int currentCount_; + // Current number of messages in the queue. + + private MessageBlock head_; + // Head of Message_Block list. + + private MessageBlock tail_; + // Tail of Message_Block list. + + // The Delegated Notification mechanisms. + private NotFullCondition notFullCondition_ = new NotFullCondition (this); + private NotEmptyCondition notEmptyCondition_ = new NotEmptyCondition (this); + +} diff --git a/java/src/MessageType.java b/java/src/MessageType.java new file mode 100644 index 00000000000..2443ec38a67 --- /dev/null +++ b/java/src/MessageType.java @@ -0,0 +1,110 @@ +/************************************************* + * + * = PACKAGE + * ACE.ASX + * + * = FILENAME + * MessageType.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.ASX; + +/** + * <hr> + * <h2>SYNOPSIS</h2> + *<blockquote> + * Message types used by ACE.MessageBlock. + *</blockquote> + * + * <h2>DESCRIPTION</h2> + *<blockquote> + * Defines bit masks used to identify various types of messages. + *</blockquote> + * + *<h2>NOTES</h2> + *<blockquote> + * This class is not intended to be instantiable. + *</blockquote> + */ +public class MessageType +{ + // = Data and protocol messages (regular and priority) + /** regular data */ + public static final int MB_DATA = 0x01; + + /** protocol control */ + public static final int MB_PROTO = 0x02; + + /** regular data */ + public static final int MB_OBJECT = 0x09; + + + // = Control messages (regular and priority) + /** line break */ + public static final int MB_BREAK = 0x03; + + /** pass file pointer */ + public static final int MB_PASSFP = 0x04; + + /** post an event to an event queue */ + public static final int MB_EVENT = 0x05; + + /** generate process signal */ + public static final int MB_SIG = 0x06; + + /** ioctl; set/get params */ + public static final int MB_IOCTL = 0x07; + + /** set various stream head options */ + public static final int MB_SETOPTS = 0x08; + + + // = Control messages (high priority; go to head of queue) + /** acknowledge ioctl */ + public static final int MB_IOCACK = 0x81; + + /** negative ioctl acknowledge */ + public static final int MB_IOCNAK = 0x82; + + /** priority proto message */ + public static final int MB_PCPROTO = 0x83; + + /** generate process signal */ + public static final int MB_PCSIG = 0x84; + + /** generate read notification */ + public static final int MB_READ = 0x85; + + /** flush your queues */ + public static final int MB_FLUSH = 0x86; + + /** stop transmission immediately */ + public static final int MB_STOP = 0x87; + + /** restart transmission after stop */ + public static final int MB_START = 0x88; + + /** line disconnect */ + public static final int MB_HANGUP = 0x89; + + /** fatal error used to set u.u_error */ + public static final int MB_ERROR = 0x8a; + + /** post an event to an event queue */ + public static final int MB_PCEVENT = 0x8b; + + + /** Normal priority messages */ + public static final int MB_NORMAL = 0x00; + + /** High priority control messages */ + public static final int MB_PRIORITY = 0x80; + + // Default private constructor to avoid instantiation + private MessageType () + { + } +} + diff --git a/java/src/Mutex.java b/java/src/Mutex.java new file mode 100644 index 00000000000..7a0fcc2c28f --- /dev/null +++ b/java/src/Mutex.java @@ -0,0 +1,93 @@ +/************************************************* + * + * = PACKAGE + * ACE.Concurrency + * + * = FILENAME + * Mutex.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Concurrency; + +import java.util.*; +import ACE.ASX.*; +import ACE.Reactor.*; + +class TimedWaitMAdapter extends TimedWait +{ + TimedWaitMAdapter (Object obj) + { + super (obj); + } + + // Check to see if the lock is currently held or not. + public boolean condition () + { + return !this.inUse_; + } + + // Acquire/Release the lock + public void inUse (boolean c) + { + this.inUse_ = c; + } + + private boolean inUse_ = false; + // The actual lock +} + + +/** + * <hr> + * <h2>TITLE</h2> + *<blockquote> + * Value added abstraction for mutex variable creation. + *</blockquote> + * + * <h2>DESCRIPTION</h2> + *<blockquote> + * A timed mutex, <em>i.e.</em> a mutex whose operations do not + * block forever and can <q>time out</q>. + *</blockquote> + */ +public class Mutex +{ + /** + * Acquire the mutex. Note that this will block. + *@exception InterruptedException exception during wait + */ + public synchronized void acquire () throws InterruptedException + { + this.monitor_.timedWait (); + this.monitor_.inUse (true); + } + + /** + * Acquire the mutex. Note that the call will return if <timeout> + * amount of time expires. + *@param tv amount of time (TimeValue) to wait before returning + * (unless operation completes before) + *@exception TimeoutException wait timed out exception + *@exception InterruptedException exception during wait + */ + public synchronized void acquire (TimeValue tv) throws + TimeoutException, InterruptedException + { + this.monitor_.timedWait (tv); + this.monitor_.inUse (true); + } + + /** + * Release the mutex. + */ + public synchronized void release () + { + this.monitor_.inUse (false); + this.monitor_.signal (); + } + + private TimedWaitMAdapter monitor_ = new TimedWaitMAdapter (this); + // The monitor (adapter) to wait on +} diff --git a/java/src/OS.java b/java/src/OS.java new file mode 100644 index 00000000000..3c762a059e6 --- /dev/null +++ b/java/src/OS.java @@ -0,0 +1,72 @@ +/************************************************* + * + * = PACKAGE + * ACE.OS + * + * = FILENAME + * OS.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.OS; + +import java.util.StringTokenizer; + +/** + * <hr> + * <h2>SYNOPSIS</h2> + *<blockquote> + * Methods to extend the capabilities of the Java runtime system. + *</blockquote> + * + * <h2>DESCRIPTION</h2> + *<blockquote> + * This non-instantiable class contains little <q>utility functions</q> + * that should have been in Java to begin with :-) + *</blockquote> + */ +public class OS +{ + /** + * Create an array of Strings from a single String using <delim> as + * the delimiter. + *@param args the String to break up to make an array of Strings + *@param delim the delimeter to use to break the String up + *@return an array containing the original String broken up + */ + public static String [] createStringArray (String args, String delim) + { + // First determine the number of arguments + int count = 0; + StringTokenizer tokens = new StringTokenizer (args, delim); + while (tokens.hasMoreTokens ()) + { + tokens.nextToken (); + count++; + } + if (count == 0) + return null; + + // Create argument array + String [] argArray = new String [count]; + int index = 0; + tokens = new StringTokenizer (args, " "); + while (tokens.hasMoreTokens ()) + { + argArray [index] = tokens.nextToken (); + index++; + } + + // Assert index == count + if (index != count) + return null; + else + return argArray; + } + + // Default private constructor to avoid instantiation + private OS () + { + } +} diff --git a/java/src/ProfileTimer.java b/java/src/ProfileTimer.java new file mode 100644 index 00000000000..6ce0d7ae811 --- /dev/null +++ b/java/src/ProfileTimer.java @@ -0,0 +1,48 @@ +/************************************************* + * + * = PACKAGE + * ACE.Timers + * + * = FILENAME + * ProfileTimer.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Timers; + +/** + * <hr> + * <p><b>TITLE</b><br> + * A Java wrapper for interval timers. + */ +public class ProfileTimer +{ + /** + * Start the timer. + */ + public void start () + { + this.startTime_ = java.lang.System.currentTimeMillis (); + } + + /** + * Stop the timer. + */ + public void stop () + { + this.stopTime_ = java.lang.System.currentTimeMillis (); + } + + /** + * Determine elapsed time between start and stop. + *@return Total elapsed time (stop - start). + */ + public long elapsedTime () + { + return this.stopTime_ - this.startTime_; + } + + private long startTime_; + private long stopTime_; +} diff --git a/java/src/RWMutex.java b/java/src/RWMutex.java new file mode 100644 index 00000000000..8421e61a73e --- /dev/null +++ b/java/src/RWMutex.java @@ -0,0 +1,104 @@ +/************************************************* + * + * = PACKAGE + * ACE.Concurrency + * + * = FILENAME + * RWMutex.java + * + *@author Ross Dargahi (rossd@krinfo.com) and Prashant Jain + * + *************************************************/ +package ACE.Concurrency; + +/******************************************************************************* +* <HR> +* <B> Description </B> +* <BR> +* This class increments a read/write lock. A read/write lock allows multiple +* readers or a single writer to access the guarded element. +* </PRE><P><HR> +* <B> Notes </B> +* <UL> +* <LI> This class does not support recursive semantics +* </UL> +*******************************************************************************/ +public class RWMutex +{ + /** + * Acquires the write lock + * @exception InterruptedException Lock acquisition interrupted + **/ + public void acquire() + throws InterruptedException + { + acquireWrite(); + } + + /** + * Acquires the read lock + * @exception InterruptedException Lock acquisition interrupted + **/ + public synchronized void acquireRead() + throws InterruptedException + { + // Wait till there is an active writer, wait. + while (this.mWriterActive_) + wait(); + + this.mNumReaders_++; + } + + /** + * Acquires the write lock + * @exception InterruptedException Lock acquisition interrupted + **/ + public synchronized void acquireWrite() + throws InterruptedException + { + // If there is an active writer before us, then wait for it to finish + // before proceeding + while (this.mWriterActive_) + wait(); + + // Set the writer active flag to true, then wait for all readers to finish + // with the lock. Note that no new readers will be able to grab the lock + // since they will be blocking on the writer active flag in acquireRead() + this.mWriterActive_ = true; + + while (this.mNumReaders_ > 0) + wait(); + + this.mWriterHoldsLock_ = true; + } + + /** + * Release held lock + * @exception InterruptedException Lock acquisition interrupted + **/ + public synchronized void release() + { + if (this.mWriterHoldsLock_) + { + this.mWriterActive_ = false; + this.mWriterHoldsLock_ = false; + } + else + { + this.mNumReaders_--; + } + + notifyAll(); + + } + + private int mNumReaders_; + // Current number of readers + + private boolean mWriterActive_; + // If true, a writer is active + + private boolean mWriterHoldsLock_; + // If true, a writer holds the lock +} + diff --git a/java/src/SOCKAcceptor.java b/java/src/SOCKAcceptor.java new file mode 100644 index 00000000000..a8e8f6d0994 --- /dev/null +++ b/java/src/SOCKAcceptor.java @@ -0,0 +1,114 @@ +/************************************************* + * + * = PACKAGE + * ACE.SOCK_SAP + * + * = FILENAME + * SOCKAcceptor.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.SOCK_SAP; + +import java.io.*; +import java.net.*; +import ACE.OS.*; + +/** + * <hr> + * <p><b>TITLE</b><br> + * Defines the format and interface for an ACE.SOCKAcceptor. + */ +public class SOCKAcceptor +{ + // = Initialization + + /** + * Create a SOCKAcceptor. Do nothing constructor. Allows user to + * call open() later and pass in the port number. + */ + public SOCKAcceptor () + { + } + + /** + * Create a SOCKAcceptor. + *@param port port number where the server will listen for connections + */ + public SOCKAcceptor (int port) throws IOException + { + this.open (port); + } + + /** + * Create socket to listen for connections on. + *@param port port number where the server will listen for connections + */ + public void open(int port) throws IOException + { + // Close old socket (if there is one) + this.close (); + + // Create a new server socket + this.listenSocket_ = new ServerSocket (port); + // ACE.DEBUG ("Server listening on port " + port); + } + + /** + * Close the socket and do any necessary cleanup. + */ + public void close () throws IOException + { + if (this.listenSocket_ != null) + { + this.listenSocket_.close (); + this.listenSocket_ = null; + } + } + + /** + * Accept a connection. The streams are set when the method returns. + *@param sockStream SOCK Stream to use for the connection + */ + public void accept (SOCKStream sockStream) throws SocketException, IOException + { + // Block in accept. Returns when a connection shows up and sets + // the streams + sockStream.socket (this.listenSocket_.accept ()); + ACE.DEBUG ("Accepted connection from " + + sockStream.socket ().getInetAddress ()); + } + + /** + * Get the underlying listen socket. + *@return the underlying listen socket + */ + public ServerSocket listenSocket () + { + return this.listenSocket_; + } + + /** + * Set the underlying listen socket. + *@param s the underlying listen socket + */ + public void listenSocket (ServerSocket s) + { + this.listenSocket_ = s; + } + + /** + * Clean up when the garbage collector gets run (if at all). Note + * that there is no guarantee that finalize () will get called. + */ + protected void finalize () throws Throwable + { + super.finalize (); + this.close (); + } + + // Socket on which listen for connections (by default initialized to + // null) + private ServerSocket listenSocket_; +} diff --git a/java/src/SOCKConnector.java b/java/src/SOCKConnector.java new file mode 100644 index 00000000000..8b94b22db47 --- /dev/null +++ b/java/src/SOCKConnector.java @@ -0,0 +1,63 @@ +/************************************************* + * + * = PACKAGE + * ACE.SOCK_SAP + * + * = FILENAME + * SOCKConnector.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.SOCK_SAP; + +import java.io.*; +import java.net.*; +import ACE.OS.*; + +/** + * <hr> + * <p><b>TITLE</b><br> + * Defines an active connection factory for the socket wrappers. + */ +public class SOCKConnector +{ + // = Initialization + + /** + * Create a SOCKConnector. Do nothing constructor. Allows user to + * call connect() later. + */ + public SOCKConnector () + { + // Do nothing constructor + } + + /** + * Create a SOCKConnector and connect to the server. + *@param sockStream SOCK Stream to use for the connection + *@param hostname hostname of the server + *@param port port number to connect with server at + */ + public SOCKConnector (SOCKStream sockStream, + String hostname, + int port) throws SocketException, IOException + { + this.connect (sockStream, + hostname, + port); + } + + /** + * Connect to the server. + *@param sockStream SOCK Stream to use for the connection + *@param hostname hostname of the server + *@param port port number to connect with server at + */ + public void connect (SOCKStream sockStream, + String hostname, + int port) throws SocketException, IOException + { + sockStream.socket (new Socket (hostname, port)); + } +} diff --git a/java/src/SOCKStream.java b/java/src/SOCKStream.java new file mode 100644 index 00000000000..d944a8fc8d2 --- /dev/null +++ b/java/src/SOCKStream.java @@ -0,0 +1,196 @@ +/************************************************* + * + * = PACKAGE + * ACE.SOCK_SAP + * + * = FILENAME + * SOCKStream.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.SOCK_SAP; + +import java.io.*; +import java.net.*; +import ACE.OS.*; + +/** + * <hr> + * <p><b>TITLE</b><br> + * Defines the methods in the ACE.SOCKStream abstraction. + * + * <p><b>DESCRIPTION</b><br> + * This adds additional wrapper methods atop the java Socket class. + */ +public class SOCKStream +{ + /** + * Create a default SOCK Stream. Do nothing constructor. + */ + public SOCKStream () + { + } + + /** + * Create a SOCK Stream. + *@param s Socket to initialize SOCK Stream with. + */ + public SOCKStream (Socket s) throws IOException + { + this.socket (s); + } + + /** + * Set the socket and the underlying streams. + *@param s Socket associated with the SOCK Stream. + */ + public void socket (Socket s) throws IOException + { + // Note that if s is not a valid socket or is null, the + // following calls will throw exceptions + this.iStream_ = new DataInputStream (s.getInputStream ()); + this.oStream_ = new PrintStream (s.getOutputStream ()); + this.socket_ = s; + } + + /* Get the underlying Socket. + *@return the underlying socket + */ + public Socket socket () + { + return this.socket_; + } + + /** + * Close the streams and the underlying socket. + */ + public void close () throws IOException + { + if (this.socket_ != null) + this.socket_.close (); + this.socket_ = null; + } + + // = The following send and recv methods are overloaded to provide a + // flexible interface + + /** + * Send a StringBuffer. Note that the method blocks. + *@param s the StringBuffer to send + *@return the length of the StringBuffer + */ + public int send (StringBuffer s) throws IOException + { + // Get the data out + String buf = s.toString (); + this.oStream_.println (buf); + this.oStream_.flush (); + return buf.length (); + } + + /** + * Send a String. Note that the method blocks. + *@param s the String to send + *@return the length of the String + */ + public int send (String s) throws IOException + { + this.oStream_.println (s); + this.oStream_.flush (); + return s.length (); + } + + /** + * Send an array of bytes. Note that the method blocks. + *@param b array of bytes to send + *@param offset offset into the byte array to start sending from + *@param length number of bytes to send + *@return number of bytes sent + */ + public int sendN (byte[] b, int offset, int length) throws IOException + { + this.oStream_.write (b, offset, length); + this.oStream_.flush (); + return length; + } + + /** + * Receive data and append it to the StringBuffer that was passed + * in. Note that the method blocks. + *@param s the StringBuffer to append the result of the recv to + *@return the length of the String received + */ + public int recv (StringBuffer s) throws IOException + { + String temp = this.iStream_.readLine (); + s.append (temp); + return temp.length (); + } + + /** + * Receive an array of characters. This method blocks until either + * all the bytes are read, the end of the stream is detected, or + * an exception is thrown. + *@param b byte array to receive the data in + *@param offset the start offset of the data in the byte array. + *@param n number of bytes to receive + *@return n + */ + public int recvN (byte[] b, int offset, int n) throws IOException + { + this.iStream_.readFully (b, offset, n); + return n; + } + + /** + * Set the underlying input stream. + *@param iStream the input stream + */ + public void inputStream (InputStream iStream) + { + this.iStream_ = new DataInputStream (iStream); + } + + /** + * Get the underlying input stream. + *@return the underlying input stream + */ + public InputStream inputStream () + { + return this.iStream_; + } + + /** + * Set the underlying output stream. + *@param iStream the output stream + */ + public void outputStream (OutputStream oStream) + { + this.oStream_ = new PrintStream (oStream); + } + + /** + * Get the underlying output stream. + *@return the underlying output stream + */ + public OutputStream outputStream () + { + return this.oStream_; + } + + /** + * Cleanup when the SOCK Stream is garbage collected. + */ + protected void finalize () throws Throwable + { + super.finalize (); + this.close (); + } + + private Socket socket_; + + // = The input and output streams (by default null) + private DataInputStream iStream_; + private PrintStream oStream_; +} diff --git a/java/src/Semaphore.java b/java/src/Semaphore.java new file mode 100644 index 00000000000..b7a587b30d8 --- /dev/null +++ b/java/src/Semaphore.java @@ -0,0 +1,104 @@ +/************************************************* + * + * = PACKAGE + * ACE.Concurrency + * + * = FILENAME + * Semaphore.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Concurrency; + +import java.util.*; +import ACE.ASX.*; +import ACE.Reactor.*; + +class TimedWaitSAdapter extends TimedWait +{ + TimedWaitSAdapter (Object obj) + { + super (obj); + } + + // Check to see if there are any semaphores available. + public boolean condition () + { + return this.count_ > 0; + } + + // Increment the count by one + public void increment () + { + this.count_++; + } + + // Decrement the count by one + public void decrement () + { + this.count_--; + } + + // Set the count + public void count (int c) + { + this.count_ = c; + } + + private int count_ = 0; +} + +/** + * <hr> + * <p><b>TITLE</b><br> + * Implementation of a semaphore in java. + */ +public class Semaphore +{ + /** + * Create a Semaphore. + *@param count semaphore count + */ + public Semaphore (int c) + { + this.monitor_.count (c); + } + + /** + * Acquire the Semaphore. Note that this will block. + *@exception InterruptedException exception during wait + */ + public synchronized void acquire () throws InterruptedException + { + this.monitor_.timedWait (); + this.monitor_.decrement (); + } + + /** + * Acquire the Semaphore. Note that the call will return if <timeout> + * amount of time expires. + *@param tv amount of time (TimeValue) to wait before returning + * (unless operation completes before) + *@exception TimeoutException wait timed out exception + *@exception InterruptedException exception during wait + */ + public synchronized void acquire (TimeValue tv) throws + TimeoutException, InterruptedException + { + this.monitor_.timedWait (tv); + this.monitor_.decrement (); + } + + /** + * Release the Semaphore. + */ + public synchronized void release () + { + this.monitor_.increment (); + this.monitor_.signal (); + } + + private TimedWaitSAdapter monitor_ = new TimedWaitSAdapter (this); + // The monitor (adapter) to wait on +} diff --git a/java/src/ServiceConfig.java b/java/src/ServiceConfig.java new file mode 100644 index 00000000000..8ffa26d9e98 --- /dev/null +++ b/java/src/ServiceConfig.java @@ -0,0 +1,168 @@ +/************************************************* + * + * = PACKAGE + * ACE.ServiceConfigurator + * + * = FILENAME + * ServiceConfig.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.ServiceConfigurator; + +import java.io.*; +import ACE.OS.*; +import ACE.Misc.*; + +/** + * <hr> + * <p><b>TITLE</b><br> + * Provide the base class that supplies common server daemon + * operations. + */ +public class ServiceConfig +{ + public static int open (String [] args) throws + FileNotFoundException, IOException, ClassNotFoundException, + IllegalAccessException, InstantiationException + { + ServiceConfig.parseArgs (args); + if (ServiceConfig.svcRep_ == null) + ServiceConfig.svcRep_ = new ServiceRepository (); + return ServiceConfig.processDirectives (); + } + + protected static int processDirectives () throws + FileNotFoundException, IOException, ClassNotFoundException, + IllegalAccessException, InstantiationException + { + File configFile = new File (ServiceConfig.serviceConfigFile_); + + // Check if file exists and is a normal file + if (!configFile.exists () || !configFile.isFile ()) + throw new FileNotFoundException ("File " + ServiceConfig.serviceConfigFile_ + " not found"); + + // Check if the file is readable + if (!configFile.canRead ()) + throw new IOException ("File " + ServiceConfig.serviceConfigFile_ + " not readable"); + + // Set up the stream + FileInputStream fileIn = new FileInputStream (configFile); + + // Parse the file + StreamTokenizer in = new StreamTokenizer (fileIn); + + // Set '#' as comment character to be ignored and set '/' as + // ordinary character (was original comment character) + in.commentChar ('#'); + in.ordinaryChar ('/'); + + // Set characters in ASCII range 33 to 47 and ASCII range 91 to 96 + // as ordinary characters + in.wordChars ('!', '/'); // ASCII range 33 to 47 + in.wordChars ('[', '`'); // ASCII range 91 to 96 + + String className = null; + String classType = null; + String args = null; + // Create a state machine + int state = ServiceConfig.CLASS_NAME; + + while (in.nextToken () != StreamTokenizer.TT_EOF) + { + switch (state) + { + case ServiceConfig.CLASS_NAME: + if (in.ttype == StreamTokenizer.TT_WORD) + className = in.sval; + else + throw new IOException ("Illegal CLASS NAME argument in file " + ServiceConfig.serviceConfigFile_); + state = ServiceConfig.CLASS_TYPE; + break; + case ServiceConfig.CLASS_TYPE: + if (in.ttype == StreamTokenizer.TT_WORD) + classType = in.sval; + else + throw new IOException ("Illegal CLASS TYPE argument in file " + ServiceConfig.serviceConfigFile_); + state = ServiceConfig.ARGS; + // Set space to be an ordinary character to allow + // arguments to be parsed in + in.wordChars (' ', ' '); + break; + case ServiceConfig.ARGS: + if (in.ttype == StreamTokenizer.TT_WORD) + { + args = in.sval; + // Load the class + Class c = ServiceConfig.svcRep_.load (className); + + // Note that c should be defined else an exception + // would have been thrown by now + + // Figure out the type of the class, create an + // instance and then invoke the initialization method + if (classType.equals ("ServiceObject")) + { + ServiceObject svcObj = (ServiceObject) c.newInstance (); + + // Create an array of String from args String + String [] argArray = OS.createStringArray (args, " "); + + // Call init on the Service Object passing in arguments + svcObj.init (argArray); + } + else + throw new IOException ("Illegal CLASS TYPE argument in file " + ServiceConfig.serviceConfigFile_); + } + else + throw new IOException ("Illegal ARGS argument in file " + ServiceConfig.serviceConfigFile_); + state = ServiceConfig.CLASS_NAME; + // Set space back to whitespace-character to extract the + // next token + in.whitespaceChars (' ', ' '); + break; + default: + throw new IOException ("Illegal argument in file " + ServiceConfig.serviceConfigFile_); + } + } + return 0; + } + + protected static void parseArgs (String [] args) + { + GetOpt getopt = new GetOpt (args, "bdnf:"); + for (int c; (c = getopt.next ()) != -1; ) + switch (c) + { + case 'b': + // Note: not supported yet! + ServiceConfig.beADaemon_ = true; + break; + case 'd': + ServiceConfig.debug_ = true; + break; + case 'n': + ServiceConfig.noDefaults_ = true; + break; + case 'f': + ServiceConfig.serviceConfigFile_ = getopt.optarg (); + break; + default: + ACE.ERROR ((char ) c + " is not a ServiceConfig option"); + break; + } + } + + // Set by command line options + private static boolean beADaemon_ = false; + private static boolean debug_ = false; + private static boolean noDefaults_ = false; + private static String serviceConfigFile_ = "svc.conf"; + private static ServiceRepository svcRep_ = null; + + // States for the state-machine used in parsing the config file + private final static int CLASS_NAME = 0; + private final static int CLASS_TYPE = 1; + private final static int ARGS = 2; +} diff --git a/java/src/ServiceObject.java b/java/src/ServiceObject.java new file mode 100644 index 00000000000..7a6f8d2e909 --- /dev/null +++ b/java/src/ServiceObject.java @@ -0,0 +1,61 @@ +/************************************************* + * + * = PACKAGE + * ACE.ServiceConfigurator + * + * = FILENAME + * ServiceObject.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.ServiceConfigurator; + +import java.io.*; +import ACE.Reactor.*; + +public class ServiceObject implements EventHandler +{ + /** + * Initialize object when dynamic loading occurs. Overwrite this + * method to do anything useful. + *@return -1 (default implementation) + */ + public int init (String [] args) + { + return -1; + } + + /** + * Terminate the object. Note that an object can not be explicitly + * unloaded. Overwrite this method to do anything useful. + *@return -1 (default implementation) + */ + public int fini () + { + return -1; + } + + /** + * Get information on an active object. Overwrite this method to do + * anything useful. + *@return null (default implementation) + */ + public String info () + { + return null; + } + + /** + * Called when timer expires. Overwrite this method to do + * anything useful. + *@param tv Time Value for which timer was set + *@param obj An arbitrary object that was passed to the Timer Queue + * (Asynchronous Completion Token) + *@return -1 + */ + public int handleTimeout (TimeValue tv, Object obj) + { + return -1; + } +} diff --git a/java/src/ServiceRepository.java b/java/src/ServiceRepository.java new file mode 100644 index 00000000000..9ae7ed12668 --- /dev/null +++ b/java/src/ServiceRepository.java @@ -0,0 +1,151 @@ +/************************************************* + * + * = PACKAGE + * ACE.ServiceConfigurator + * + * = FILENAME + * ServiceRepository.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.ServiceConfigurator; + +import java.io.*; +import java.util.*; +import ACE.OS.*; + +public class ServiceRepository +{ + /** + * Create a Service Repository. + */ + public ServiceRepository () + { + this.loader_ = new ServiceLoader (); + } + + /** + * Load a Java Class. This method first checks if the class is + * already in the repository. If it is not, it checks if the class + * is a system class and can be found on the local system. If both + * these fail, the method tries to load the bytes for the class and + * then create the Class from these bytes. + *@param name name of the class to find in the repository + *@return Class corresponding to name + */ + public Class load (String name) throws ClassNotFoundException + { + return this.loader_.loadClass (name, true); + } + + private ServiceLoader loader_; +} + +class ServiceLoader extends ClassLoader +{ + public ServiceLoader () + { + super (); + this.getClassPath (); + } + + // Load a compiled class file into runtime + public Class loadClass (String name, boolean resolve) throws ClassNotFoundException + { + Class newClass; + try + { + // Try to load it the class by reading in the bytes. + // Note that we are not catching ClassNotFoundException here + // since our caller will catch it. + try + { + byte[] buf = bytesForClass (name); + newClass = defineClass (buf, 0, buf.length); + // ACE.DEBUG ("Loaded class: "+ name); + + // Check if we need to load other classes referred to by this class. + if (resolve) + resolveClass (newClass); + } + catch (ClassNotFoundException e) + { + // ACE.DEBUG ("Using default loader for class: "+ name); + // Try default system loader + if ((newClass = findSystemClass (name)) != null) + return newClass; + else + throw (e); // Rethrow the exception + } + } + catch (IOException e) + { + throw new ClassNotFoundException (e.toString ()); + } + return newClass; + } + + private void getClassPath () + { + // Cache system properties that are needed when trying to find a + // class file + this.classPath_ = System.getProperty ("java.class.path", "."); + this.pathSeparator_ = System.getProperty ("path.separator", ":"); + this.fileSeparator_ = System.getProperty ("file.separator", "/"); + } + + // Read a file into a byte array + private byte[] bytesForClass (String name) throws IOException, ClassNotFoundException + { + // Break up the CLASSPATH to check for existence of file in each + // sub-path. Note that we use the path_separator to extract every + // sub-path and use that to check if the class file exists. + StringTokenizer tokens = new StringTokenizer (this.classPath_, + this.pathSeparator_); + while (tokens.hasMoreTokens ()) + { + // Create a File object which can then be used to see if the + // class file actually exists + File classFile = new File (tokens.nextToken () + + this.fileSeparator_ + + name + + ".class"); + + // Check if file exists, is a normal file and is readable + if (classFile.exists () && + classFile.isFile () && + classFile.canRead ()) + { + // Set up the stream + FileInputStream in = new FileInputStream (classFile); + + // Get byte count + int length = in.available (); + + if (length == 0) + throw new ClassNotFoundException (name); + + // Create an array of bytes to read the file in + byte[] buf = new byte[length]; + + // Read the file + in.read (buf); + + // Return byte array + return buf; + } + } + // File was not found so throw an exception. + throw new ClassNotFoundException ("Class file " + name + " not found"); + } + + private String classPath_; + // Class path that is loaded at construction + + private String pathSeparator_; + // Platform-dependent path separator (e.g., : or ;) + + private String fileSeparator_; + // Platform-dependent file separator (e.g., / or \) +} diff --git a/java/src/StrategyAcceptor.java b/java/src/StrategyAcceptor.java new file mode 100644 index 00000000000..2febad428fa --- /dev/null +++ b/java/src/StrategyAcceptor.java @@ -0,0 +1,148 @@ +/************************************************* + * + * = PACKAGE + * ACE.Connection + * + * = FILENAME + * StrategyAcceptor.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Connection; + +import java.io.*; +import java.net.*; +import ACE.OS.*; +import ACE.SOCK_SAP.*; + +public class StrategyAcceptor +{ + /** + * Create an instance of StrategyAcceptor. + *@param handlerFactory Svc Handler factory that is used to create + * an instance of a Svc Handler when a connection is accepted. + */ + public StrategyAcceptor (Class handlerFactory) + { + this (handlerFactory, null, null, null); + } + + /** + * Create an instance of StrategyAcceptor. Use the creation + * strategy and the handlerFactory passed in to creae a new instance + * of the Svc Handler. + *@param handlerFactory Svc Handler factory that is used to create + * an instance of a Svc Handler when a connection is accepted. + *@param creStrategy Creation strategy to use to create a new + * instance of the Svc Handler. + *@param acceptStrategy Accept strategy to use to accept a new + * connection into the Svc Handler. + *@param activateStrategy Activate strategy to use to activate the + * instance of the Svc Handler. + */ + public StrategyAcceptor (Class handlerFactory, + CreationStrategy creStrategy, + AcceptStrategy acceptStrategy, + ActivateStrategy activateStrategy) + { + // Cache everything + this.handlerFactory_ = handlerFactory; + this.creStrategy_ = creStrategy; + this.acceptStrategy_ = acceptStrategy; + this.activateStrategy_ = activateStrategy; + } + + /** + * Initialize the Strategy Acceptor. The method creates the + * appropriate strategies as needed. + *@param port port number where the server will listen for connections + */ + public void open (int port) throws IOException + { + if (this.creStrategy_ == null) + this.creStrategy_ = new CreationStrategy (this.handlerFactory_); + if (this.acceptStrategy_ == null) + this.acceptStrategy_ = new AcceptStrategy (port); + else + this.acceptStrategy_.open (port); + if (this.activateStrategy_ == null) + this.activateStrategy_ = new ActivateStrategy (); + } + + /** + * Accept a connection using the appropriate strategies. + */ + public void accept () throws SocketException, + InstantiationException, + IllegalAccessException, + IOException + { + // Create a Svc_Handler using the appropriate Creation_Strategy + SvcHandler sh = this.makeSvcHandler (); + + // Accept a connection into the Svc_Handler + this.acceptSvcHandler (sh); + + // Activate the Svc_Handler + this.activateSvcHandler (sh); + } + + /** + * Bridge method for creating a SvcHandler. The strategy for + * creating a SvcHandler is configured into the Acceptor via it's + * creStrategy_. If no strategy is passed in, the default behavior + * of this method is to use the default CreationStrategy. + *@return a new instance of the Svc Handler + */ + protected SvcHandler makeSvcHandler () throws InstantiationException, + IllegalAccessException + { + // Create a new handler for the connection + return this.creStrategy_.makeSvcHandler (); + } + + + /** + * Bridge method for accepting the new connection into the + * <SvcHandler>. The strategy for accepting into a SvcHandler is + * configured into the Acceptor via it's acceptStrategy_. If no + * strategy is passed in, the default behavior of this method is to + * use the default AcceptStrategy. + *@param sh Svc Handler in which to accept the connection + *@return result of accepting a connection using the accept strategy + */ + protected int acceptSvcHandler (SvcHandler sh) throws + SocketException, IOException + { + // Delegate responsibility to the appropriate strategy + return this.acceptStrategy_.acceptSvcHandler (sh); + } + + /** + * Bridge method for activating a <SvcHandler>. The strategy for + * activating a SvcHandler is configured into the Acceptor via it's + * activateStrategy_. If no strategy is passed in, the default + * behavior of this method is to use the default ActivateStrategy. + *@param sh Svc Handler to activate + *@return result of activating the Svc Handler + */ + protected int activateSvcHandler (SvcHandler sh) + { + // Delegate responsibility to the appropriate strategy + return this.activateStrategy_.activateSvcHandler (sh); + } + + // Handler class that should be instantiated when a connection is + // made with a client + private Class handlerFactory_; + + // Creation Strategy + private CreationStrategy creStrategy_; + + // Accept Strategy + private AcceptStrategy acceptStrategy_; + + // Activation Strategy + private ActivateStrategy activateStrategy_; +} diff --git a/java/src/SvcHandler.java b/java/src/SvcHandler.java new file mode 100644 index 00000000000..39485dbf4e6 --- /dev/null +++ b/java/src/SvcHandler.java @@ -0,0 +1,85 @@ +/************************************************* + * + * = PACKAGE + * ACE.Connection + * + * = FILENAME + * SvcHandler.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Connection; + +import java.io.*; +import java.net.*; +import ACE.SOCK_SAP.*; +import ACE.ASX.*; +import ACE.Reactor.*; + +public abstract class SvcHandler extends Task +{ + + /** + * Do nothing constructor. + */ + public SvcHandler () + { + } + + /** + * Set the stream using the SOCKStream passed in. This sets the + * underlying peer + *@param s SOCK Stream to use for the connection + */ + public void setHandle (SOCKStream s) throws IOException + { + this.stream_ = s; + } + + /** + * Get the underlying peer + *@return the underlying peer + */ + public SOCKStream peer () + { + return this.stream_; + } + + /** + * Abstract method that subclasses must define to allow + * initialization to take place. + */ + public abstract int open (Object obj); + + /** + * Provide a default implementation to simplify ancestors. + *@return 0 + */ + public int close (long flags) + { + return 0; + } + + /** + * Provide a default implementation to simplify ancestors. + *@return -1 + */ + public int put (MessageBlock mb, TimeValue tv) + { + return -1; + } + + /** + * Provide a default implementation to simplify ancestors. + *@param tv Time Value for which timer was set + *@param obj An arbitrary object that was passed to the Timer Queue + * (Asynchronous Completion Token) + */ + public int handleTimeout (TimeValue tv, Object obj) + { + return -1; + } + + private SOCKStream stream_; +} diff --git a/java/src/Task.java b/java/src/Task.java new file mode 100644 index 00000000000..0c1c41ec826 --- /dev/null +++ b/java/src/Task.java @@ -0,0 +1,325 @@ +/************************************************* + * + * = PACKAGE + * ACE.ASX + * + * = FILENAME + * Task.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.ASX; + +import ACE.OS.*; +import ACE.Reactor.*; +import ACE.Concurrency.*; + +public abstract class Task implements Runnable, EventHandler +{ + // = Initialization/termination methods. + + /** + * Initialize a Task. Note, we allocate a message queue ourselves. + */ + public Task () + { + this.msgQueue_ = new MessageQueue (); + this.thrMgr_ = null; + } + + /** + * Initialize a Task. Note, we use the message queue and thread + * manager supplied by the user. + *@param mq Message Queue to hold list of messages on the Task + *@param thrMgr Thread Manager that manages all the spawned threads + */ + public Task (MessageQueue mq, + ThreadManager thrMgr) + { + this.msgQueue_ = mq; + this.thrMgr_ = thrMgr; + } + + /** + * Not meant to be invoked by the user directly!. This needs to be + * in the public interface in order to get invoked by Thread + * class. + */ + public void run () + { + this.svc (); + } + + // = Initialization and termination hooks (note that these *must* be + // defined by subclasses). + + /** + * Hook called to open a Task. + *@param obj used to pass arbitrary information + */ + public abstract int open (Object obj); + + /** + * Hook called to close a Task. + */ + public abstract int close (long flags); + + // = Immediate and deferred processing methods, respectively. + + /** + * Transfer a message into the queue to handle immediate + * processing. + *@param mb Message Block to handle immediately + *@param tv amount of time to wait for + */ + public abstract int put (MessageBlock mb, TimeValue tv); + + /** + * Run by a daemon thread to handle deferred processing. Note, that + * to do anything useful, this method should be overriden by the + * subclass. + *@return default implementation always returns 0. + */ + public int svc () + { + return 0; + } + + // = Active object method. + + /** + * Turn the task into an active object. That is, having <nThreads> + * separate threads of control that all invoke Task::svc. + *@param flags Task Flags + *@param nThreads number of threads to spawn + *@param forceActive whether to force creation of new threads or not + *@return -1 if failure occurs, 1 if Task is already an active + * object and <forceActive> is false (doesn't *not* create a new + * thread in this case), and 0 if Task was not already an active + * object and a thread is created successfully or thread is an active + * object and <forceActive> is true. + */ + public synchronized int activate (long flags, int nThreads, boolean forceActive) + { + if (this.thrCount () > 0 && forceActive == false) + return 1; // Already active. + this.flags_ = flags; + + // Create a Thread Manager if we do not already have one + if (this.thrMgr_ == null) + this.thrMgr_ = new ThreadManager (); + + if (ACE.BIT_ENABLED (flags, TaskFlags.THR_DAEMON)) + this.thrMgr_.spawnN (nThreads, this, true); // Spawn off all threads as daemon threads + else // Spawn off all threads as normal threads + this.thrMgr_.spawnN (nThreads, this, false); + + return 0; + } + + // = Suspend/resume a Task + + /** + * Suspend a task. + */ + public synchronized void suspend () + { + // Suspend all threads + this.thrMgr_.thrGrp ().suspend (); + } + + /** + * Resume a suspended task. + */ + public synchronized void resume () + { + // Resume all threads + this.thrMgr_.thrGrp ().resume (); + } + + /** + * Get the current group name. + *@return name of the current thread group + */ + public synchronized String grpName () + { + return this.thrMgr_.thrGrp ().getName (); + } + + /** + * Get the message queue associated with this task. + *@return the message queue associated with this task. + */ + public MessageQueue msgQueue () + { + return this.msgQueue_; + } + + /** + * Set the message queue associated with this task. + *@param mq Message Queue to use with this Task. + */ + public void msgQueue (MessageQueue mq) + { + this.msgQueue_ = mq; + } + + /** + * Get the number of threads currently running within the Task. + *@return the number of threads currently running within the Task. + * 0 if we're a passive object, else > 0. + */ + public synchronized int thrCount () + { + return this.thrMgr_.thrGrp ().activeCount (); + } + + // = Message queue manipulation methods. + + /** + * Insert message into the message queue. + *@param mb Message Block to insert into the Message Queue + *@param tv amount of time to wait for + */ + protected int putq (MessageBlock mb, TimeValue tv) throws InterruptedException + { + return this.msgQueue_.enqueueTail (mb, tv); + } + + /** + * Extract the first message from the queue. Note that the call is blocking. + *@return the first Message Block from the Message Queue. + *@param tv amount of time to wait for + */ + protected MessageBlock getq (TimeValue tv) throws InterruptedException + { + return this.msgQueue_.dequeueHead (tv); + } + + /** + * Return a message back to the queue. + *@param mb Message Block to return back to the Message Queue + *@param tv amount of time to wait for + */ + protected int ungetq (MessageBlock mb, TimeValue tv) throws InterruptedException + { + return this.msgQueue_.enqueueHead (mb, tv); + } + + /** + * Transfer message to the adjacent ACETask in an ACEStream. + *@param mb Message Block to transfer to the adjacent Task + *@param tv amount of time to wait for + *@return -1 if there is no adjacent Task, else the return value of + * trying to put the Message Block on that Task's Message Queue. + */ + protected int putNext (MessageBlock mb, TimeValue tv) + { + return this.next_ == null ? -1 : this.next_.put (mb, tv); + } + + /** + * Turn the message back around. Puts the message in the sibling's + * Message Queue. + *@param mb Message Block to put into sibling's Message Queue + *@param tv amount of time to wait for + *@return -1 if there is no adjacent Task to the sibling, else the + * return value of trying to put the Message Block on sibling's + * Message Queue. + */ + protected int reply (MessageBlock mb, TimeValue tv) + { + return this.sibling ().putNext (mb, tv); + } + + // = ACE_Task utility routines to identify names et al. + + /** + * Get the name of the enclosing Module. (NOT YET IMPLEMENTED) + *@return the name of the enclosing Module if there's one associated + * with the Task, else null. + */ + protected String name () + { + // Needs to be implemented once Module class has been implemeted + return null; + } + + /** + * Get the Task's sibling. (NOT YET IMPLEMENTED) + *@return the Task's sibling if there's one associated with the + * Task's Module, else null. + */ + protected Task sibling () + { + // Needs to be implemented once Module class has been implemeted + return null; + } + + /** + * Check if queue is a reader. + *@return true if queue is a reader, else false. + */ + protected boolean isReader () + { + return (ACE.BIT_ENABLED (this.flags_, TaskFlags.ACE_READER)); + } + + /** + * Check if queue is a writer. + *@return true if queue is a writer, else false. + */ + protected boolean isWriter () + { + return (ACE.BIT_DISABLED (this.flags_, TaskFlags.ACE_READER)); + } + + // = Pointers to next ACE_Queue (if ACE is part of an ACE_Stream). + + /** + * Get next Task pointer. + *@return pointer to the next Task + */ + protected Task next () + { + return this.next_; + } + + /** + * Set next Task pointer. + *@param task next task pointer + */ + protected void next (Task task) + { + this.next_ = task; + } + + // Special routines corresponding to certain message types. + + /** + * Flush the Message Queue + *@return 0 if Message Queue is null, 1 if flush succeeds, -1 if + * ACE_FLUSHALL bit is not enabled in flags. + */ + protected int flush (long flag) + { + if (ACE.BIT_ENABLED (flag, TaskFlags.ACE_FLUSHALL)) + return (this.msgQueue_ == null ? 0 : 1); + else + return -1; + } + + private ThreadManager thrMgr_; + // Thread_Manager that manages all the spawned threads + + private long flags_; + // Task flags. + + private MessageQueue msgQueue_; + // List of messages on the Task.. + + private Task next_; + // Adjacent ACE_Task. + +} diff --git a/java/src/TaskFlags.java b/java/src/TaskFlags.java new file mode 100644 index 00000000000..fdca5d20df6 --- /dev/null +++ b/java/src/TaskFlags.java @@ -0,0 +1,44 @@ +/************************************************* + * + * = PACKAGE + * ACE.ASX + * + * = FILENAME + * TaskFlags.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.ASX; + +public abstract class TaskFlags +{ + /** Identifies a Task as being the "reader" in a Module. */ + public static final int ACE_READER = 01; + + /** Just flush data messages in the queue. */ + public static final int ACE_FLUSHDATA = 02; + + /** Flush all messages in the Queue. */ + public static final int ACE_FLUSHALL = 04; + + /** Flush read queue */ + public static final int ACE_FLUSHR = 010; + + /** Flush write queue */ + public static final int ACE_FLUSHW = 020; + + /** Flush both queues */ + public static final int ACE_FLUSHRW = 030; + + /** Identifies a thread as suspended */ + public static final int THR_SUSPENDED = 0x00000080; + + /** Identifies a thread as a daemon thread */ + public static final int THR_DAEMON = 0x00000100; + + // Default private constructor to avoid instantiation + private TaskFlags () + { + } +} diff --git a/java/src/ThreadManager.java b/java/src/ThreadManager.java new file mode 100644 index 00000000000..6f5f48748a6 --- /dev/null +++ b/java/src/ThreadManager.java @@ -0,0 +1,109 @@ +/************************************************* + * + * = PACKAGE + * ACE.Concurrency + * + * = FILENAME + * ThreadManager.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Concurrency; + +import java.util.*; +import ACE.OS.*; + +public class ThreadManager +{ + /** + * Default constructor + */ + public ThreadManager () + { + this (ACE.DEFAULT_THREAD_GROUP_NAME); + } + + /** + * Create a Thread Manager. + *@param groupName name of the thread group that the Thread Manager + * will manage + */ + public ThreadManager (String groupName) + { + this.thrGrp_ = new ThreadGroup (groupName); + if (this.thrGrp_ == null) + ACE.ERROR ("Thread group create failed"); + } + + /** + * Create a new thread. + *@param thr the caller whose run method will be invoked when the + * thread has been spawned + *@param daemon flag indicating whether the thread should be + * spawned off as a daemon thread + */ + public void spawn (Runnable thr, + boolean daemon) + { + Thread t = new Thread (this.thrGrp_, thr); + if (daemon) // Set the thread to be a daemon thread + t.setDaemon (true); + t.start (); + } + + /** + * Create a new thread and also give it a name. + *@param thr the caller whose run method will be invoked when the + * thread has been spawned + *@param threadName the name of the new thread + *@param daemon flag indicating whether the thread should be + * spawned off as a daemon thread + */ + public void spawn (Runnable thr, + String threadName, + boolean daemon) + { + Thread t = new Thread (this.thrGrp_, thr, threadName); + if (daemon) // Set the thread to be a daemon thread + t.setDaemon (true); + t.start (); + } + + + /** + * Create <n> new threads. + *@param n the number of threads to spawn + *@param thr the caller whose run method will be invoked by each of + * the <n> threads + *@param daemon flag indicating whether the threads should be + * spawned off as daemon threads + */ + public void spawnN (int n, + Runnable thr, + boolean daemon) + { + // Spawn off all the threads. + for (int i = 0; i < n; i++) + { + this.spawn (thr, daemon); + } + } + + /** + * Get the thread group containing all the threads. Note that the + * thread group can be used to get information regarding number of + * active threads as well as to suspend/resume all the threads in + * the group. + *@return the thread group that contains all the threads managed by + * the Thread Manager + */ + public ThreadGroup thrGrp () + { + return this.thrGrp_; + } + + private ThreadGroup thrGrp_; + // Thread Group that contains all the spawned threads + +} diff --git a/java/src/TimeValue.java b/java/src/TimeValue.java new file mode 100644 index 00000000000..9234bca529a --- /dev/null +++ b/java/src/TimeValue.java @@ -0,0 +1,255 @@ +/************************************************* + * + * = PACKAGE + * ACE.Reactor + * + * = FILENAME + * TimeValue.java + * + *@author Prashant Jain + * + *************************************************/ +package ACE.Reactor; + +public class TimeValue +{ + public final static TimeValue zero = new TimeValue (0,0); + + /** + * Default constructor + */ + public TimeValue () + { + this (0, 0); + } + + /** + * Constructor + *@param sec seconds + */ + public TimeValue (long sec) + { + this (sec, 0); + } + + /** + * Constructor + *@param sec seconds + *@param nanos nanoseconds + */ + public TimeValue (long sec, int nanos) + { + this.set (sec, nanos); + } + + /** + * Sets the seconds and nanoseconds of Time Value + *@param sec seconds + *@param nanos nanoseconds + */ + public void set (long sec, int nanos) + { + this.millisec_ = sec * 1000; + this.nanos_ = nanos; + this.normalize (); + } + + /** + * Get seconds + *@return Seconds + */ + public long sec () + { + return this.millisec_/1000; + } + + /** + * Get nanoseconds + *@return Nanoseconds + */ + public int nanos () + { + return this.nanos_; + } + + /** + * Get time in milliseconds. + *@return time in milliseconds + */ + public long getMilliTime () + { + return this.millisec_; + } + + /** + * Get a String representation of the Time Value. + *@return String representation of the Time Value + */ + public String toString () + { + return (new Long (this.millisec_/1000)).toString () + ":" + + (new Integer (this.nanos_)).toString (); + } + + /** + * Get current time. + *@return the current system time + */ + public static TimeValue gettimeofday () + { + return new TimeValue (System.currentTimeMillis ()); + } + + /** + * Compare two Time Values for equality. + *@param tv Time Value to compare with + *@return true if the two Time Values are equal, false otherwise + */ + public boolean equals (TimeValue tv) + { + return this.millisec_ == (tv.sec () * 1000) && this.nanos_ == tv.nanos (); + } + + /** + * Compare two Time Values for non-equality. + *@param tv Time Value to compare with + *@return true if the two Time Values are not equal, false otherwise + */ + public boolean notEquals (TimeValue tv) + { + return !this.equals (tv); + } + + /** + * Add two Time Values. + *@param tv1 The first Time Value + *@param tv2 The second Time Value + *@return sum of the two Time Values. + */ + public static TimeValue plus (TimeValue tv1, TimeValue tv2) + { + TimeValue tv = new TimeValue (tv1.sec () + tv2.sec (), + tv1.nanos () + tv2.nanos ()); + tv.normalize (); + return tv; + } + + /** + * Subtract two Time Values. + *@param tv1 The first Time Value + *@param tv2 The second Time Value + *@return difference of the two Time Values. + */ + public static TimeValue minus (TimeValue tv1, TimeValue tv2) + { + TimeValue tv = new TimeValue (tv1.sec () - tv2.sec (), + tv1.nanos () - tv2.nanos ()); + tv.normalize (); + return tv; + } + + /** + * Add Time Value to "this". + *@param tv The Time Value to add to this. + */ + public void plusEquals (TimeValue tv) + { + this.set (this.sec () + tv.sec (), + this.nanos () + tv.nanos ()); + this.normalize (); + } + + /** + * Subtract Time Value from "this". + *@param tv The Time Value to subtract from this. + */ + public void minusEquals (TimeValue tv) + { + this.set (this.sec () - tv.sec (), + this.nanos () - tv.nanos ()); + this.normalize (); + } + + /** + * Compare two Time Values for less than. + *@param tv Time Value to compare with + *@return true if "this" is less than tv, false otherwise + */ + public boolean lessThan (TimeValue tv) + { + return tv.greaterThan (this); + } + + /** + * Compare two Time Values for greater than. + *@param tv Time Value to compare with + *@return true if "this" is greater than tv, false otherwise + */ + public boolean greaterThan (TimeValue tv) + { + if (this.sec () > tv.sec ()) + return true; + else if (this.sec () == tv.sec () + && this.nanos () > tv.nanos ()) + return true; + else + return false; + } + + /** + * Compare two Time Values for <=. + *@param tv Time Value to compare with + *@return true if "this" <= tv, false otherwise + */ + public boolean lessThanEqual (TimeValue tv) + { + return tv.greaterThanEqual (this); + } + + /** + * Compare two Time Values for >=. + *@param tv Time Value to compare with + *@return true if "this" >= tv, false otherwise + */ + public boolean greaterThanEqual (TimeValue tv) + { + return this.sec () >= tv.sec () && this.nanos () >= tv.nanos (); + } + + private void normalize () + { + if (this.nanos_ >= ONE_MILLISECOND) + { + do + { + this.millisec_++; + this.nanos_ -= ONE_MILLISECOND; + } + while (this.nanos_ >= ONE_MILLISECOND); + } + else if (this.nanos_ <= -ONE_MILLISECOND) + { + do + { + this.millisec_--; + this.nanos_ += ONE_MILLISECOND; + } + while (this.nanos_ <= -ONE_MILLISECOND); + } + + if (this.millisec_ >= 1 && this.nanos_ < 0) + { + this.millisec_--; + this.nanos_ += ONE_MILLISECOND; + } + else if (this.millisec_ < 0 && this.nanos_ > 0) + { + this.millisec_++; + this.nanos_ -= ONE_MILLISECOND; + } + } + + private long millisec_; + private int nanos_; + private final long ONE_MILLISECOND = 1000000L; +} diff --git a/java/src/TimedWait.java b/java/src/TimedWait.java new file mode 100644 index 00000000000..c708eb36945 --- /dev/null +++ b/java/src/TimedWait.java @@ -0,0 +1,136 @@ +/************************************************* + * + * = PACKAGE + * ACE.ASX + * + * = FILENAME + * TimedWait.java + * + *@author Prashant Jain and Doug Schmidt + * + *************************************************/ +package ACE.ASX; + +import ACE.Reactor.*; + +public abstract class TimedWait +{ + /** + * Default Constructor. Sets "this" to be used for the delegation of + * the wait() call to. + */ + public TimedWait () + { + object_ = this; + } + + /** + * Constructor. Allows subclasses to supply us with an Object that + * is delegated the wait() call. + *@param obj The Object that is delegated the wait() call. + */ + public TimedWait (Object obj) + { + object_ = obj; + } + + /** + * Hook method that needs to be implemented by subclasses. + */ + public abstract boolean condition (); + + /** + * Wait until condition becomes true. Note that the method + * blocks. Also note that this method is final to ensure that no one + * overrides it. + * IMPORTANT: This method assumes it is called with the object_'s + * monitor lock already held. + */ + public final void timedWait () throws InterruptedException + { + // Acquire the monitor lock. + if (!condition ()) + { + // Only attempt to perform the wait if the condition isn't + // true initially. + for (;;) + { + // Wait until we are notified. + object_.wait (); + + // Recheck the condition. + if (condition ()) + break; // Condition became true. + // else we were falsely notified so go back into wait + } + } + } + + /** + * Template Method that implements the actual timed wait. Note that + * this method is final to ensure that no one overrides it. + * IMPORTANT: This method assumes it is called with the object_'s + * monitor lock already held. + *@param tv Amount of time to do wait for. + */ + public final void timedWait (TimeValue tv) + throws InterruptedException, + TimeoutException + { + // Acquire the monitor lock. + if (!condition ()) + { + // Only attempt to perform the timed wait if the condition isn't + // true initially. + long start = System.currentTimeMillis (); + long waitTime = tv.getMilliTime (); + + for (;;) { + // Wait until we are notified. + object_.wait (waitTime); + + // Recheck the condition. + if (!condition ()) { + long now = System.currentTimeMillis (); + long timeSoFar = now - start; + + // Timed out! + if (timeSoFar >= tv.getMilliTime ()) + throw new TimeoutException (); + else + // We still have some time left to wait, so adjust the + // wait_time. + waitTime = tv.getMilliTime () - timeSoFar; + } + else + break; // Condition became true. + } + } + } + + /** + * Notify any one thread waiting on the object_. + * IMPORTANT: This method assumes it is called with the object_'s + * monitor lock already held. + */ + public final void signal () { + object_.notify (); + } + + /** + * Notify all threads waiting on the object_. + * IMPORTANT: This method assumes it is called with the object_'s + * monitor lock already held. + */ + public final void broadcast () { + object_.notifyAll (); + } + + /** + * The object we delegate to. If a subclass gives us a particular + * object, we use that to delegate to, otherwise, we ``delegate'' + * to ourself (i.e., this). + */ + protected Object object_; + +} diff --git a/java/src/TimeoutException.java b/java/src/TimeoutException.java new file mode 100644 index 00000000000..1bff8e3415c --- /dev/null +++ b/java/src/TimeoutException.java @@ -0,0 +1,36 @@ +/************************************************* + * + * = PACKAGE + * ACE.ASX + * + * = FILENAME + * TimeoutException.java + * + *@author Prashant Jain and Doug Schmidt + * + *************************************************/ +package ACE.ASX; + +import ACE.Reactor.*; + +public class TimeoutException extends Exception +{ + /** + * Default Constructor. + */ + public TimeoutException () + { + super ("Timed Out"); + } + + /** + * Constructor. + *@param timeout The timeout value which expired. + *@param desc Textual description of the exception + */ + public TimeoutException (TimeValue timeout, String desc) + { + super ("Timed Out in " + timeout + ": " + desc); + } + +} |