diff options
Diffstat (limited to 'gnu/CORBA/Functional_ORB.java')
-rw-r--r-- | gnu/CORBA/Functional_ORB.java | 311 |
1 files changed, 250 insertions, 61 deletions
diff --git a/gnu/CORBA/Functional_ORB.java b/gnu/CORBA/Functional_ORB.java index 437f420c4..c8d6a4c93 100644 --- a/gnu/CORBA/Functional_ORB.java +++ b/gnu/CORBA/Functional_ORB.java @@ -15,8 +15,8 @@ General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Classpath; see the file COPYING. If not, write to the -Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA -02111-1307 USA. +Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +02110-1301 USA. Linking this library statically or dynamically with other modules is making a combined work based on this library. Thus, the terms and @@ -40,13 +40,14 @@ package gnu.CORBA; import gnu.CORBA.CDR.cdrBufInput; import gnu.CORBA.CDR.cdrBufOutput; +import gnu.CORBA.GIOP.CloseMessage; import gnu.CORBA.GIOP.ErrorMessage; import gnu.CORBA.GIOP.MessageHeader; import gnu.CORBA.GIOP.ReplyHeader; import gnu.CORBA.GIOP.RequestHeader; import gnu.CORBA.NamingService.NamingServiceTransient; +import gnu.CORBA.Poa.gnuForwardRequest; -import org.omg.CORBA.BAD_INV_ORDER; import org.omg.CORBA.BAD_OPERATION; import org.omg.CORBA.BAD_PARAM; import org.omg.CORBA.CompletionStatus; @@ -77,6 +78,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Enumeration; +import java.util.Hashtable; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; @@ -86,7 +88,9 @@ import java.util.TreeMap; /** * The ORB implementation, capable to handle remote invocations on the - * registered object. + * registered object. This class implements all features, required till + * the jdk 1.3 inclusive, but does not support the POA that appears since + * 1.4. The POA is supported by {@link gnu.CORBA.Poa.ORB_1_4}. * * @author Audrius Meskauskas (AudriusA@Bioinformatics.org) */ @@ -154,7 +158,7 @@ public class Functional_ORB { try { - serve(this, service); + tick(); } catch (SocketException ex) { @@ -179,6 +183,16 @@ public class Functional_ORB } /** + * Perform a single serving step. + * @throws java.lang.Exception + */ + void tick() + throws Exception + { + serve(this, service); + } + + /** * Forcibly close the server socket and mark this port as free. */ public void close_now() @@ -205,6 +219,40 @@ public class Functional_ORB } /** + * A server, responsible for listening on requests on some + * local port and serving multiple requests (probably to the + * different objects) on the same thread. + */ + class sharedPortServer + extends portServer + { + /** + * Create a new portServer, serving on specific port. + */ + sharedPortServer(int _port) + { + super(_port); + } + + /** + * Perform a single serving step. + * @throws java.lang.Exception + */ + void tick() + throws Exception + { + Socket request = service.accept(); + serveStep(request, false); + } + } + + /** + * The default value where the first instance of this ORB will start + * looking for a free port. + */ + public static int DEFAULT_INITIAL_PORT = 1126; + + /** * The property of port, on that this ORB is listening for requests from clients. * This class supports one port per ORB only. */ @@ -266,7 +314,7 @@ public class Functional_ORB */ private int TOUT_START_READING_MESSAGE = 20 * 1000; - // (Here and below, we use * to make meaning of the constant clearler). + // (Here and below, we use * to make the meaning of the constant clearler). /** * If the client has started to send the request message, the socket time @@ -285,17 +333,11 @@ public class Functional_ORB * Some clients tend to submit multiple requests over the * same socket. The server waits for the next request on * the same socket for the duration, specified - * below. The default time is seven seconds. - */ - public int TANDEM_REQUESTS = 7000; - - /** - * If the maximal number of threads per object is reached, - * the server waits for the given time interval before checking - * again maybe some threads are already complete. - * Thr default time is 0.5 second. + * below. In additions, the request of this implementation also + * waits for the same duration before closing the socket. + * The default time is seven seconds. */ - public int PAUSE_ON_THREAD_OVERLOAD = 500; + public static int TANDEM_REQUESTS = 7000; /** * The map of the already conncted objects. @@ -318,7 +360,7 @@ public class Functional_ORB /** * The map of the initial references. */ - private Map initial_references = new TreeMap(); + protected Map initial_references = new TreeMap(); /** * The currently active portServers. @@ -331,13 +373,14 @@ public class Functional_ORB private String ns_host; /** - * The port, under that the ORB is listening for remote requests. - * Then the new object is connected, this port is used first, then - * it is incremented by 1, etc. If the given port is not available, - * up to 20 subsequent values are tried and then the parameterless - * server socket contructor is called. + * Probably free port, under that the ORB will try listening for + * remote requests first. When the new object is connected, this + * port is used first, then it is incremented by 1, etc. If the given + * port is not available, up to 20 subsequent values are tried and then + * the parameterless server socket contructor is called. The constant is + * shared between multiple instances of this ORB. */ - private static int Port = 1126; + private static int Port = DEFAULT_INITIAL_PORT; /** * The port, on that the name service is expected to be running. @@ -356,6 +399,11 @@ public class Functional_ORB protected LinkedList freed_ports = new LinkedList(); /** + * Maps a single-threaded POAs to they sharedPortServants. + */ + protected Hashtable identities = new Hashtable(); + + /** * The maximal allowed number of the currently running parallel * threads per object. For security reasons, this is made private and * unchangeable. After exceeding this limit, the NO_RESOURCES @@ -371,6 +419,7 @@ public class Functional_ORB try { LOCAL_HOST = ns_host = InetAddress.getLocalHost().getHostAddress(); + initial_references.put("CodecFactory", new gnuCodecFactory(this)); } catch (UnknownHostException ex) { @@ -471,31 +520,17 @@ public class Functional_ORB /** * Set the port, on that the server is listening for the client requests. - * In this implementation, the server is listening at only one port, - * the default value being 1126. + * If only one object is connected to the orb, the server will be + * try listening on this port first. It the port is busy, or if more + * objects are connected, the subsequent object will receive a larger + * port values, skipping unavailable ports, if required. The change + * applies globally. * * @param a_Port a port, on that the server is listening for requests. - * - * @throws BAD_INV_ORDER if the server has already been started. The port - * can only be changed when the server is not yet started. - */ - public void setPort(int a_Port) - { - if (running) - throw new BAD_INV_ORDER("The server is running"); - this.Port = a_Port; - } - - /** - * Get the port, on that the server is listening for the client requests. - * In this implementation, the server is listening at only one port, - * the default value being 1126. - * - * @return the port. */ - public int getPort() + public static void setPort(int a_Port) { - return Port; + Port = a_Port; } /** @@ -547,7 +582,8 @@ public class Functional_ORB { int a_port = getFreePort(); - Connected_objects.cObject ref = connected_objects.add(key, object, a_port); + Connected_objects.cObject ref = + connected_objects.add(key, object, a_port, null); IOR ior = createIOR(ref); prepareObject(object, ior); if (running) @@ -555,11 +591,57 @@ public class Functional_ORB } /** + * Connect the given CORBA object to this ORB, explicitly specifying + * the object key and the identity of the thread (and port), where the + * object must be served. The identity is normally the POA. + * + * The new port server will be started only if there is no one + * already running for the same identity. Otherwise, the task of + * the existing port server will be widened, including duty to serve + * the given object. All objects, connected to a single identity by + * this method, will process they requests subsequently in the same + * thread. The method is used when the expected number of the + * objects is too large to have a single port and thread per object. + * This method is used by POAs, having a single thread policy. + * + * @param object the object, must implement the {@link InvokeHandler}) + * interface. + * @param key the object key, usually used to identify the object from + * remote side. + * @param port the port, where the object must be connected. + * + * @throws BAD_PARAM if the object does not implement the + * {@link InvokeHandler}). + */ + public void connect_1_thread(org.omg.CORBA.Object object, byte[] key, + java.lang.Object identity + ) + { + sharedPortServer shared = (sharedPortServer) identities.get(identity); + if (shared == null) + { + int a_port = getFreePort(); + shared = new sharedPortServer(a_port); + identities.put(identity, shared); + if (running) + { + portServers.add(shared); + shared.start(); + } + } + + Connected_objects.cObject ref = + connected_objects.add(key, object, shared.s_port, identity); + IOR ior = createIOR(ref); + prepareObject(object, ior); + } + + /** * Start the service on the given port of this IOR. * * @param ior the ior (only Internet.port is used). */ - private void startService(IOR ior) + public void startService(IOR ior) { portServer p = new portServer(ior.Internet.port); portServers.add(p); @@ -620,7 +702,7 @@ public class Functional_ORB for (int i = 0; i < portServers.size(); i++) { p = (portServer) portServers.get(i); - if (p.s_port == rmKey.port) + if (p.s_port == rmKey.port && !(p instanceof sharedPortServer)) { p.close_now(); freed_ports.addFirst(new Integer(rmKey.port)); @@ -632,6 +714,41 @@ public class Functional_ORB } /** + * Notifies ORB that the shared service indentity (usually POA) + * is destroyed. The matching shared port server is terminated + * and the identity table entry is deleted. If this identity + * is not known for this ORB, the method returns without action. + * + * @param identity the identity that has been destroyed. + */ + public void identityDestroyed(java.lang.Object identity) + { + if (identity == null) + return; + + sharedPortServer ise = (sharedPortServer) identities.get(identity); + if (ise != null) + synchronized (connected_objects) + { + ise.close_now(); + identities.remove(identity); + + Connected_objects.cObject obj; + Map.Entry m; + Iterator iter = connected_objects.entrySet().iterator(); + while (iter.hasNext()) + { + m = (Map.Entry) iter.next(); + obj = (Connected_objects.cObject) m.getValue(); + if (obj.identity == identity) + { + iter.remove(); + } + } + } + } + + /** * Find the local object, connected to this ORB. * * @param ior the ior of the potentially local object. @@ -769,19 +886,31 @@ public class Functional_ORB m = (Map.Entry) iter.next(); obj = (Connected_objects.cObject) m.getValue(); - portServer subserver = new portServer(obj.port); - portServers.add(subserver); + portServer subserver; - // Reuse the current thread for the last portServer. - if (!iter.hasNext()) + if (obj.identity == null) { - // Discard the iterator, eliminating lock checks. - iter = null; - subserver.run(); - return; + subserver = new portServer(obj.port); + portServers.add(subserver); } else - subserver.start(); + { + subserver = (portServer) identities.get(obj.identity); + } + + if (!subserver.isAlive()) + { + // Reuse the current thread for the last portServer. + if (!iter.hasNext()) + { + // Discard the iterator, eliminating lock checks. + iter = null; + subserver.run(); + return; + } + else + subserver.start(); + } } } @@ -839,7 +968,7 @@ public class Functional_ORB } object = impl; - connected_objects.add(ior.key, impl, ior.Internet.port); + connected_objects.add(ior.key, impl, ior.Internet.port, null); } return object; } @@ -1029,10 +1158,12 @@ public class Functional_ORB private void prepareObject(org.omg.CORBA.Object object, IOR ior) throws BAD_PARAM { + /* if (!(object instanceof InvokeHandler)) throw new BAD_PARAM(object.getClass().getName() + " does not implement InvokeHandler. " ); + */ // If no delegate is set, set the default delegate. if (object instanceof ObjectImpl) @@ -1109,6 +1240,43 @@ public class Functional_ORB } /** + * Forward request to another target, as indicated by the passed + * exception. + */ + private void forward_request(OutputStream net_out, MessageHeader msh_request, + RequestHeader rh_request, gnuForwardRequest info + ) + throws IOException + { + MessageHeader msh_forward = new MessageHeader(); + msh_forward.version = msh_request.version; + + ReplyHeader rh_forward = msh_forward.create_reply_header(); + msh_forward.message_type = MessageHeader.REPLY; + rh_forward.reply_status = info.forwarding_code; + rh_forward.request_id = rh_request.request_id; + + // The forwarding code is either LOCATION_FORWARD or LOCATION_FORWARD_PERM. + cdrBufOutput out = new cdrBufOutput(); + out.setOrb(this); + out.setOffset(msh_forward.getHeaderSize()); + + rh_forward.write(out); + + if (msh_forward.version.since_inclusive(1, 2)) + out.align(8); + + out.write_Object(info.forward_reference); + + msh_forward.message_size = out.buffer.size(); + + // Write the forwarding instruction. + msh_forward.write(net_out); + out.buffer.writeTo(net_out); + net_out.flush(); + } + + /** * Contains a single servicing task. * * Normally, each task matches a single remote invocation. @@ -1127,9 +1295,10 @@ public class Functional_ORB service = serverSocket.accept(); // Tell the server there are no more resources. - while (p.running_threads >= MAX_RUNNING_THREADS) + if (p.running_threads >= MAX_RUNNING_THREADS) { serveStep(service, true); + return; } new Thread() @@ -1256,6 +1425,18 @@ public class Functional_ORB throw new OBJECT_NOT_EXIST(); target._invoke(rh_request.operation, cin, handler); } + catch (gnuForwardRequest forwarded) + { + OutputStream sou = service.getOutputStream(); + forward_request(sou, msh_request, rh_request, forwarded); + if (service != null && !service.isClosed()) + { + // Wait for the subsequent invocations on the + // same socket for the TANDEM_REQUEST duration. + service.setSoTimeout(TANDEM_REQUESTS); + continue Serving; + } + } catch (SystemException ex) { sysEx = ex; @@ -1266,6 +1447,7 @@ public class Functional_ORB } catch (Exception except) { + except.printStackTrace(); sysEx = new UNKNOWN("Unknown", 2, CompletionStatus.COMPLETED_MAYBE); @@ -1284,14 +1466,21 @@ public class Functional_ORB ); } } - else - ; + else if (msh_request.message_type == MessageHeader.CLOSE_CONNECTION || + msh_request.message_type == MessageHeader.MESSAGE_ERROR + ) + { + CloseMessage.close(service.getOutputStream()); + service.close(); + return; + } + ; // TODO log error: "Not a request message." if (service != null && !service.isClosed()) { // Wait for the subsequent invocations on the - // same socket for 2 minutes. + // same socket for the TANDEM_REQUEST duration. service.setSoTimeout(TANDEM_REQUESTS); } else |