summaryrefslogtreecommitdiff
path: root/gnu/CORBA/Functional_ORB.java
diff options
context:
space:
mode:
Diffstat (limited to 'gnu/CORBA/Functional_ORB.java')
-rw-r--r--gnu/CORBA/Functional_ORB.java311
1 files changed, 250 insertions, 61 deletions
diff --git a/gnu/CORBA/Functional_ORB.java b/gnu/CORBA/Functional_ORB.java
index 437f420c4..c8d6a4c93 100644
--- a/gnu/CORBA/Functional_ORB.java
+++ b/gnu/CORBA/Functional_ORB.java
@@ -15,8 +15,8 @@ General Public License for more details.
You should have received a copy of the GNU General Public License
along with GNU Classpath; see the file COPYING. If not, write to the
-Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
-02111-1307 USA.
+Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+02110-1301 USA.
Linking this library statically or dynamically with other modules is
making a combined work based on this library. Thus, the terms and
@@ -40,13 +40,14 @@ package gnu.CORBA;
import gnu.CORBA.CDR.cdrBufInput;
import gnu.CORBA.CDR.cdrBufOutput;
+import gnu.CORBA.GIOP.CloseMessage;
import gnu.CORBA.GIOP.ErrorMessage;
import gnu.CORBA.GIOP.MessageHeader;
import gnu.CORBA.GIOP.ReplyHeader;
import gnu.CORBA.GIOP.RequestHeader;
import gnu.CORBA.NamingService.NamingServiceTransient;
+import gnu.CORBA.Poa.gnuForwardRequest;
-import org.omg.CORBA.BAD_INV_ORDER;
import org.omg.CORBA.BAD_OPERATION;
import org.omg.CORBA.BAD_PARAM;
import org.omg.CORBA.CompletionStatus;
@@ -77,6 +78,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
@@ -86,7 +88,9 @@ import java.util.TreeMap;
/**
* The ORB implementation, capable to handle remote invocations on the
- * registered object.
+ * registered object. This class implements all features, required till
+ * the jdk 1.3 inclusive, but does not support the POA that appears since
+ * 1.4. The POA is supported by {@link gnu.CORBA.Poa.ORB_1_4}.
*
* @author Audrius Meskauskas (AudriusA@Bioinformatics.org)
*/
@@ -154,7 +158,7 @@ public class Functional_ORB
{
try
{
- serve(this, service);
+ tick();
}
catch (SocketException ex)
{
@@ -179,6 +183,16 @@ public class Functional_ORB
}
/**
+ * Perform a single serving step.
+ * @throws java.lang.Exception
+ */
+ void tick()
+ throws Exception
+ {
+ serve(this, service);
+ }
+
+ /**
* Forcibly close the server socket and mark this port as free.
*/
public void close_now()
@@ -205,6 +219,40 @@ public class Functional_ORB
}
/**
+ * A server, responsible for listening on requests on some
+ * local port and serving multiple requests (probably to the
+ * different objects) on the same thread.
+ */
+ class sharedPortServer
+ extends portServer
+ {
+ /**
+ * Create a new portServer, serving on specific port.
+ */
+ sharedPortServer(int _port)
+ {
+ super(_port);
+ }
+
+ /**
+ * Perform a single serving step.
+ * @throws java.lang.Exception
+ */
+ void tick()
+ throws Exception
+ {
+ Socket request = service.accept();
+ serveStep(request, false);
+ }
+ }
+
+ /**
+ * The default value where the first instance of this ORB will start
+ * looking for a free port.
+ */
+ public static int DEFAULT_INITIAL_PORT = 1126;
+
+ /**
* The property of port, on that this ORB is listening for requests from clients.
* This class supports one port per ORB only.
*/
@@ -266,7 +314,7 @@ public class Functional_ORB
*/
private int TOUT_START_READING_MESSAGE = 20 * 1000;
- // (Here and below, we use * to make meaning of the constant clearler).
+ // (Here and below, we use * to make the meaning of the constant clearler).
/**
* If the client has started to send the request message, the socket time
@@ -285,17 +333,11 @@ public class Functional_ORB
* Some clients tend to submit multiple requests over the
* same socket. The server waits for the next request on
* the same socket for the duration, specified
- * below. The default time is seven seconds.
- */
- public int TANDEM_REQUESTS = 7000;
-
- /**
- * If the maximal number of threads per object is reached,
- * the server waits for the given time interval before checking
- * again maybe some threads are already complete.
- * Thr default time is 0.5 second.
+ * below. In additions, the request of this implementation also
+ * waits for the same duration before closing the socket.
+ * The default time is seven seconds.
*/
- public int PAUSE_ON_THREAD_OVERLOAD = 500;
+ public static int TANDEM_REQUESTS = 7000;
/**
* The map of the already conncted objects.
@@ -318,7 +360,7 @@ public class Functional_ORB
/**
* The map of the initial references.
*/
- private Map initial_references = new TreeMap();
+ protected Map initial_references = new TreeMap();
/**
* The currently active portServers.
@@ -331,13 +373,14 @@ public class Functional_ORB
private String ns_host;
/**
- * The port, under that the ORB is listening for remote requests.
- * Then the new object is connected, this port is used first, then
- * it is incremented by 1, etc. If the given port is not available,
- * up to 20 subsequent values are tried and then the parameterless
- * server socket contructor is called.
+ * Probably free port, under that the ORB will try listening for
+ * remote requests first. When the new object is connected, this
+ * port is used first, then it is incremented by 1, etc. If the given
+ * port is not available, up to 20 subsequent values are tried and then
+ * the parameterless server socket contructor is called. The constant is
+ * shared between multiple instances of this ORB.
*/
- private static int Port = 1126;
+ private static int Port = DEFAULT_INITIAL_PORT;
/**
* The port, on that the name service is expected to be running.
@@ -356,6 +399,11 @@ public class Functional_ORB
protected LinkedList freed_ports = new LinkedList();
/**
+ * Maps a single-threaded POAs to they sharedPortServants.
+ */
+ protected Hashtable identities = new Hashtable();
+
+ /**
* The maximal allowed number of the currently running parallel
* threads per object. For security reasons, this is made private and
* unchangeable. After exceeding this limit, the NO_RESOURCES
@@ -371,6 +419,7 @@ public class Functional_ORB
try
{
LOCAL_HOST = ns_host = InetAddress.getLocalHost().getHostAddress();
+ initial_references.put("CodecFactory", new gnuCodecFactory(this));
}
catch (UnknownHostException ex)
{
@@ -471,31 +520,17 @@ public class Functional_ORB
/**
* Set the port, on that the server is listening for the client requests.
- * In this implementation, the server is listening at only one port,
- * the default value being 1126.
+ * If only one object is connected to the orb, the server will be
+ * try listening on this port first. It the port is busy, or if more
+ * objects are connected, the subsequent object will receive a larger
+ * port values, skipping unavailable ports, if required. The change
+ * applies globally.
*
* @param a_Port a port, on that the server is listening for requests.
- *
- * @throws BAD_INV_ORDER if the server has already been started. The port
- * can only be changed when the server is not yet started.
- */
- public void setPort(int a_Port)
- {
- if (running)
- throw new BAD_INV_ORDER("The server is running");
- this.Port = a_Port;
- }
-
- /**
- * Get the port, on that the server is listening for the client requests.
- * In this implementation, the server is listening at only one port,
- * the default value being 1126.
- *
- * @return the port.
*/
- public int getPort()
+ public static void setPort(int a_Port)
{
- return Port;
+ Port = a_Port;
}
/**
@@ -547,7 +582,8 @@ public class Functional_ORB
{
int a_port = getFreePort();
- Connected_objects.cObject ref = connected_objects.add(key, object, a_port);
+ Connected_objects.cObject ref =
+ connected_objects.add(key, object, a_port, null);
IOR ior = createIOR(ref);
prepareObject(object, ior);
if (running)
@@ -555,11 +591,57 @@ public class Functional_ORB
}
/**
+ * Connect the given CORBA object to this ORB, explicitly specifying
+ * the object key and the identity of the thread (and port), where the
+ * object must be served. The identity is normally the POA.
+ *
+ * The new port server will be started only if there is no one
+ * already running for the same identity. Otherwise, the task of
+ * the existing port server will be widened, including duty to serve
+ * the given object. All objects, connected to a single identity by
+ * this method, will process they requests subsequently in the same
+ * thread. The method is used when the expected number of the
+ * objects is too large to have a single port and thread per object.
+ * This method is used by POAs, having a single thread policy.
+ *
+ * @param object the object, must implement the {@link InvokeHandler})
+ * interface.
+ * @param key the object key, usually used to identify the object from
+ * remote side.
+ * @param port the port, where the object must be connected.
+ *
+ * @throws BAD_PARAM if the object does not implement the
+ * {@link InvokeHandler}).
+ */
+ public void connect_1_thread(org.omg.CORBA.Object object, byte[] key,
+ java.lang.Object identity
+ )
+ {
+ sharedPortServer shared = (sharedPortServer) identities.get(identity);
+ if (shared == null)
+ {
+ int a_port = getFreePort();
+ shared = new sharedPortServer(a_port);
+ identities.put(identity, shared);
+ if (running)
+ {
+ portServers.add(shared);
+ shared.start();
+ }
+ }
+
+ Connected_objects.cObject ref =
+ connected_objects.add(key, object, shared.s_port, identity);
+ IOR ior = createIOR(ref);
+ prepareObject(object, ior);
+ }
+
+ /**
* Start the service on the given port of this IOR.
*
* @param ior the ior (only Internet.port is used).
*/
- private void startService(IOR ior)
+ public void startService(IOR ior)
{
portServer p = new portServer(ior.Internet.port);
portServers.add(p);
@@ -620,7 +702,7 @@ public class Functional_ORB
for (int i = 0; i < portServers.size(); i++)
{
p = (portServer) portServers.get(i);
- if (p.s_port == rmKey.port)
+ if (p.s_port == rmKey.port && !(p instanceof sharedPortServer))
{
p.close_now();
freed_ports.addFirst(new Integer(rmKey.port));
@@ -632,6 +714,41 @@ public class Functional_ORB
}
/**
+ * Notifies ORB that the shared service indentity (usually POA)
+ * is destroyed. The matching shared port server is terminated
+ * and the identity table entry is deleted. If this identity
+ * is not known for this ORB, the method returns without action.
+ *
+ * @param identity the identity that has been destroyed.
+ */
+ public void identityDestroyed(java.lang.Object identity)
+ {
+ if (identity == null)
+ return;
+
+ sharedPortServer ise = (sharedPortServer) identities.get(identity);
+ if (ise != null)
+ synchronized (connected_objects)
+ {
+ ise.close_now();
+ identities.remove(identity);
+
+ Connected_objects.cObject obj;
+ Map.Entry m;
+ Iterator iter = connected_objects.entrySet().iterator();
+ while (iter.hasNext())
+ {
+ m = (Map.Entry) iter.next();
+ obj = (Connected_objects.cObject) m.getValue();
+ if (obj.identity == identity)
+ {
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ /**
* Find the local object, connected to this ORB.
*
* @param ior the ior of the potentially local object.
@@ -769,19 +886,31 @@ public class Functional_ORB
m = (Map.Entry) iter.next();
obj = (Connected_objects.cObject) m.getValue();
- portServer subserver = new portServer(obj.port);
- portServers.add(subserver);
+ portServer subserver;
- // Reuse the current thread for the last portServer.
- if (!iter.hasNext())
+ if (obj.identity == null)
{
- // Discard the iterator, eliminating lock checks.
- iter = null;
- subserver.run();
- return;
+ subserver = new portServer(obj.port);
+ portServers.add(subserver);
}
else
- subserver.start();
+ {
+ subserver = (portServer) identities.get(obj.identity);
+ }
+
+ if (!subserver.isAlive())
+ {
+ // Reuse the current thread for the last portServer.
+ if (!iter.hasNext())
+ {
+ // Discard the iterator, eliminating lock checks.
+ iter = null;
+ subserver.run();
+ return;
+ }
+ else
+ subserver.start();
+ }
}
}
@@ -839,7 +968,7 @@ public class Functional_ORB
}
object = impl;
- connected_objects.add(ior.key, impl, ior.Internet.port);
+ connected_objects.add(ior.key, impl, ior.Internet.port, null);
}
return object;
}
@@ -1029,10 +1158,12 @@ public class Functional_ORB
private void prepareObject(org.omg.CORBA.Object object, IOR ior)
throws BAD_PARAM
{
+ /*
if (!(object instanceof InvokeHandler))
throw new BAD_PARAM(object.getClass().getName() +
" does not implement InvokeHandler. "
);
+ */
// If no delegate is set, set the default delegate.
if (object instanceof ObjectImpl)
@@ -1109,6 +1240,43 @@ public class Functional_ORB
}
/**
+ * Forward request to another target, as indicated by the passed
+ * exception.
+ */
+ private void forward_request(OutputStream net_out, MessageHeader msh_request,
+ RequestHeader rh_request, gnuForwardRequest info
+ )
+ throws IOException
+ {
+ MessageHeader msh_forward = new MessageHeader();
+ msh_forward.version = msh_request.version;
+
+ ReplyHeader rh_forward = msh_forward.create_reply_header();
+ msh_forward.message_type = MessageHeader.REPLY;
+ rh_forward.reply_status = info.forwarding_code;
+ rh_forward.request_id = rh_request.request_id;
+
+ // The forwarding code is either LOCATION_FORWARD or LOCATION_FORWARD_PERM.
+ cdrBufOutput out = new cdrBufOutput();
+ out.setOrb(this);
+ out.setOffset(msh_forward.getHeaderSize());
+
+ rh_forward.write(out);
+
+ if (msh_forward.version.since_inclusive(1, 2))
+ out.align(8);
+
+ out.write_Object(info.forward_reference);
+
+ msh_forward.message_size = out.buffer.size();
+
+ // Write the forwarding instruction.
+ msh_forward.write(net_out);
+ out.buffer.writeTo(net_out);
+ net_out.flush();
+ }
+
+ /**
* Contains a single servicing task.
*
* Normally, each task matches a single remote invocation.
@@ -1127,9 +1295,10 @@ public class Functional_ORB
service = serverSocket.accept();
// Tell the server there are no more resources.
- while (p.running_threads >= MAX_RUNNING_THREADS)
+ if (p.running_threads >= MAX_RUNNING_THREADS)
{
serveStep(service, true);
+ return;
}
new Thread()
@@ -1256,6 +1425,18 @@ public class Functional_ORB
throw new OBJECT_NOT_EXIST();
target._invoke(rh_request.operation, cin, handler);
}
+ catch (gnuForwardRequest forwarded)
+ {
+ OutputStream sou = service.getOutputStream();
+ forward_request(sou, msh_request, rh_request, forwarded);
+ if (service != null && !service.isClosed())
+ {
+ // Wait for the subsequent invocations on the
+ // same socket for the TANDEM_REQUEST duration.
+ service.setSoTimeout(TANDEM_REQUESTS);
+ continue Serving;
+ }
+ }
catch (SystemException ex)
{
sysEx = ex;
@@ -1266,6 +1447,7 @@ public class Functional_ORB
}
catch (Exception except)
{
+ except.printStackTrace();
sysEx =
new UNKNOWN("Unknown", 2, CompletionStatus.COMPLETED_MAYBE);
@@ -1284,14 +1466,21 @@ public class Functional_ORB
);
}
}
- else
- ;
+ else if (msh_request.message_type == MessageHeader.CLOSE_CONNECTION ||
+ msh_request.message_type == MessageHeader.MESSAGE_ERROR
+ )
+ {
+ CloseMessage.close(service.getOutputStream());
+ service.close();
+ return;
+ }
+ ;
// TODO log error: "Not a request message."
if (service != null && !service.isClosed())
{
// Wait for the subsequent invocations on the
- // same socket for 2 minutes.
+ // same socket for the TANDEM_REQUEST duration.
service.setSoTimeout(TANDEM_REQUESTS);
}
else