summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-05-11 04:12:35 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-05-11 04:12:35 +0000
commit9072338cbfd971f21105c33cc2d83341e99ddcd7 (patch)
treec670719da79412a97e98ecfacc60a93278898c39
parentba39b492dad25e56a1e07e0c8c898c80bd456d2f (diff)
downloadATCD-9072338cbfd971f21105c33cc2d83341e99ddcd7.tar.gz
First attempt towards revamping the ORB for AMI model implementation.
Integrated all of Carlos' suggestions on to the architecture. First tried to get the "Wait_On_Read" strategy to work. Not complete yet.
-rw-r--r--TAO/tao/Connect.cpp9
-rw-r--r--TAO/tao/GIOP.cpp213
-rw-r--r--TAO/tao/GIOP.h10
-rw-r--r--TAO/tao/IIOP_Transport.cpp357
-rw-r--r--TAO/tao/IIOP_Transport.h225
-rw-r--r--TAO/tao/Invocation.cpp48
-rw-r--r--TAO/tao/Invocation.i2
7 files changed, 742 insertions, 122 deletions
diff --git a/TAO/tao/Connect.cpp b/TAO/tao/Connect.cpp
index aa379474670..9a6cbdaa708 100644
--- a/TAO/tao/Connect.cpp
+++ b/TAO/tao/Connect.cpp
@@ -875,8 +875,7 @@ TAO_Client_Connection_Handler::send_request (TAO_ORB_Core *,
int
TAO_Client_Connection_Handler::handle_input (ACE_HANDLE)
{
- errno = ENOTSUP;
- return -1;
+ return this->transport_->handle_client_input ();
}
int
@@ -1177,7 +1176,7 @@ TAO_MT_Client_Connection_Handler::send_request (TAO_ORB_Core *orb_core,
// Save the ORB_Core for the handle_input callback...
this->orb_core_ = orb_core;
- // NOTE: Here would also be a fine place to calculate a digital
+ // NOTE: Here would also be a find place to calculate a digital
// signature for the message and place it into a preallocated slot
// in the "ServiceContext". Similarly, this is a good spot to
// encrypt messages (or just the message bodies) if that's needed in
@@ -1280,7 +1279,7 @@ TAO_MT_Client_Connection_Handler::send_request (TAO_ORB_Core *orb_core,
// TAO_ORB_Core.
this->orb_core_->set_leader_thread ();
- // this might increase the recount of the leader
+ // this might increase the refcount of the leader
if (this->orb_core_->leader_follower_lock ().release () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -1334,7 +1333,7 @@ TAO_MT_Client_Connection_Handler::handle_input (ACE_HANDLE)
if (!this->expecting_response_)
{
- // we got something, but did not want
+ // we got something, but did not want.
// @@ wake up an other thread, we are lost
if (this->orb_core_->leader_follower_lock ().release () == -1)
diff --git a/TAO/tao/GIOP.cpp b/TAO/tao/GIOP.cpp
index 278f33e6b2a..d17a3fc6968 100644
--- a/TAO/tao/GIOP.cpp
+++ b/TAO/tao/GIOP.cpp
@@ -68,8 +68,8 @@ static const char *TAO_GIOP_Timeprobe_Description[] =
"GIOP::send_request - start",
"GIOP::send_request - end",
- "GIOP::recv_request - start",
- "GIOP::recv_request - end",
+ "GIOP::recv_message - start",
+ "GIOP::recv_message - end",
"GIOP::read_buffer - start",
"GIOP::read_buffer - end",
@@ -84,8 +84,8 @@ enum
TAO_GIOP_SEND_REQUEST_START = 100,
TAO_GIOP_SEND_REQUEST_END,
- TAO_GIOP_RECV_REQUEST_START,
- TAO_GIOP_RECV_REQUEST_END,
+ TAO_GIOP_RECV_MESSAGE_START,
+ TAO_GIOP_RECV_MESSAGE_END,
TAO_GIOP_READ_BUFFER_START,
TAO_GIOP_READ_BUFFER_END,
@@ -436,13 +436,18 @@ TAO_GIOP::read_buffer (TAO_Transport *transport,
// performance. The two read () calls can be made into one by fancy
// buffering. How fast could it be with both optimizations applied?
+// I am now making this call non-blocking. For reading the header it
+// is not non-blocking. But for reading the rest of the message, it is
+// non-blocking. Total size and the current offset of the incoming
+// message is kept at the Transport class.
TAO_GIOP::Message_Type
-TAO_GIOP::recv_request (TAO_Transport *transport,
+TAO_GIOP::recv_message (TAO_Transport *transport,
TAO_InputCDR &msg,
- TAO_ORB_Core* orb_core)
+ TAO_ORB_Core* orb_core,
+ int read_header)
{
- TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_RECV_REQUEST_START);
-
+ TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_RECV_MESSAGE_START);
+
// Read the message header off the wire.
//
// THREADING NOTE: the connection manager handed us this connection
@@ -452,100 +457,106 @@ TAO_GIOP::recv_request (TAO_Transport *transport,
// near future) but makes less effective use of connection resources
// as the "duty factor" goes down because of either long calls or
// bursty contention during numerous short calls to the same server.
-
- ACE_CDR::mb_align (&msg.start_);
-
- ssize_t header_len = TAO_GIOP_HEADER_LEN;
-
- if (orb_core->orb_params ()->use_lite_protocol ())
- header_len = TAO_GIOP_LITE_HEADER_LEN;
-
- if (ACE_CDR::grow (&msg.start_,
- header_len) == -1)
- // This should probably be an exception.
- return TAO_GIOP::CommunicationError;
-
- char *header = msg.start_.rd_ptr ();
- ssize_t len = TAO_GIOP::read_buffer (transport,
- header,
- header_len);
- // Read the header into the buffer.
-
- if (len != header_len)
+
+
+ if (read_header)
{
- switch (len)
+ // This is the first read for this message.
+
+ ACE_CDR::mb_align (&msg.start_);
+
+ ssize_t header_len = TAO_GIOP_HEADER_LEN;
+
+ if (orb_core->orb_params ()->use_lite_protocol ())
+ header_len = TAO_GIOP_LITE_HEADER_LEN;
+
+ if (ACE_CDR::grow (&msg.start_,
+ header_len) == -1)
+ // This should probably be an exception.
+ return TAO_GIOP::CommunicationError;
+
+ char *header = msg.start_.rd_ptr ();
+ ssize_t len = TAO_GIOP::read_buffer (transport,
+ header,
+ header_len);
+ // Read the header into the buffer.
+
+ if (len != header_len)
{
- case 0:
- if (TAO_orbdebug)
- ACE_DEBUG ((LM_DEBUG,
- "(%t) End of connection, transport handle %d\n",
- transport->handle ()));
- return TAO_GIOP::EndOfFile;
- // @@ should probably find some way to report this without
- // an exception, since for most servers it's not an error.
- // Is it _never_ an error? Not sure ...
- /* NOTREACHED */
-
- case -1: // error
- if (TAO_orbdebug)
- ACE_DEBUG ((LM_ERROR,
- "(%P|%t) GIOP::recv_request header %p\n",
- "read_buffer"));
- break;
- /* NOTREACHED */
-
- default:
- if (TAO_orbdebug)
- ACE_DEBUG ((LM_ERROR,
- "(%P|%t) GIOP::recv_request header read failed, "
- "only %d of %d bytes\n",
- len,
- header_len));
- break;
- /* NOTREACHED */
- }
-
- return TAO_GIOP::CommunicationError;
- }
-
- // NOTE: if message headers, or whole messages, get encrypted in
- // application software (rather than by the network infrastructure)
- // they should be decrypted here ...
-
- // First make sure it's a GIOP message of any version.
-
- TAO_GIOP::Message_Type retval;
- CORBA::ULong message_size;
- if (TAO_GIOP::parse_header (msg,
- msg.do_byte_swap_,
- retval,
- message_size,
- orb_core) == -1)
- {
- TAO_GIOP::send_error (transport);
- // We didn't really receive anything useful here.
- return TAO_GIOP::CommunicationError;
-
- }
-
- // Make sure we have the full length in memory, growing the buffer
- // if needed.
- //
- // NOTE: We could overwrite these few bytes of header... they're
- // left around for now as a debugging aid.
-
- assert (message_size <= UINT_MAX);
-
- if (ACE_CDR::grow (&msg.start_,
- header_len + message_size) == -1)
- return TAO_GIOP::CommunicationError;
+ switch (len)
+ {
+ case 0:
+ if (TAO_orbdebug)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) End of connection, transport handle %d\n",
+ transport->handle ()));
+ return TAO_GIOP::EndOfFile;
+ // @@ should probably find some way to report this without
+ // an exception, since for most servers it's not an error.
+ // Is it _never_ an error? Not sure ...
+ /* NOTREACHED */
+
+ case -1: // error
+ if (TAO_orbdebug)
+ ACE_DEBUG ((LM_ERROR,
+ "(%P|%t) GIOP::recv_message header %p\n",
+ "read_buffer"));
+ break;
+ /* NOTREACHED */
+
+ default:
+ if (TAO_orbdebug)
+ ACE_DEBUG ((LM_ERROR,
+ "(%P|%t) GIOP::recv_message header read failed, "
+ "only %d of %d bytes\n",
+ len,
+ header_len));
+ break;
+ /* NOTREACHED */
+ }
+
+ return TAO_GIOP::CommunicationError;
+
+ // NOTE: if message headers, or whole messages, get encrypted in
+ // application software (rather than by the network infrastructure)
+ // they should be decrypted here ...
+
+ // First make sure it's a GIOP message of any version.
+
+ TAO_GIOP::Message_Type retval;
+ CORBA::ULong message_size;
+ if (TAO_GIOP::parse_header (msg,
+ msg.do_byte_swap_,
+ retval,
+ message_size,
+ orb_core) == -1)
+ {
+ TAO_GIOP::send_error (transport);
+ // We didn't really receive anything useful here.
+ return TAO_GIOP::CommunicationError;
+ }
+
+ // Make sure we have the full length in memory, growing the
+ // buffer if needed.
+ //
+ // NOTE: We could overwrite these few bytes of header... they're
+ // left around for now as a debugging aid.
+
+ assert (message_size <= UINT_MAX);
+
+ if (ACE_CDR::grow (&msg.start_,
+ header_len + message_size) == -1)
+ return TAO_GIOP::CommunicationError;
+
+ // Growing the buffer may have reset the rd_ptr(), but we want to
+ // leave it just after the GIOP header (that was parsed already);
+ ACE_CDR::mb_align (&msg.start_);
+ msg.start_.wr_ptr (header_len);
+ msg.start_.wr_ptr (message_size);
+ msg.start_.rd_ptr (header_len);
+
+ // Keep the
- // Growing the buffer may have reset the rd_ptr(), but we want to
- // leave it just after the GIOP header (that was parsed already);
- ACE_CDR::mb_align (&msg.start_);
- msg.start_.wr_ptr (header_len);
- msg.start_.wr_ptr (message_size);
- msg.start_.rd_ptr (header_len);
char* payload = msg.start_.rd_ptr ();
@@ -570,7 +581,7 @@ TAO_GIOP::recv_request (TAO_Transport *transport,
case -1:
if (TAO_orbdebug)
ACE_DEBUG ((LM_ERROR,
- "(%P|%t) TAO_GIOP::recv_request - body %p\n",
+ "(%P|%t) TAO_GIOP::recv_message - body %p\n",
"read_buffer"));
break;
/* NOTREACHED */
@@ -578,7 +589,7 @@ TAO_GIOP::recv_request (TAO_Transport *transport,
default:
if (TAO_orbdebug)
ACE_DEBUG ((LM_ERROR,
- "TAO: (%P|%t) GIOP::recv_request body read failed, "
+ "TAO: (%P|%t) GIOP::recv_message body read failed, "
"only %d of %d bytes\n",
len,
message_size));
diff --git a/TAO/tao/GIOP.h b/TAO/tao/GIOP.h
index 4c27a04b220..e19e0d7c870 100644
--- a/TAO/tao/GIOP.h
+++ b/TAO/tao/GIOP.h
@@ -303,7 +303,8 @@ public:
// All GIOP messages include a header and message type. Not
// really a message type, but needed to bring that information
// back somehow.
-
+
+ ShortRead = -3, // Short read.
CommunicationError = -2, // Invalid request.
EndOfFile = -1, // "discovered" by either.
Request = 0, // sent by client.
@@ -329,10 +330,11 @@ public:
TAO_ORB_Core* orb_core);
// Send message, returns TRUE if success, else FALSE.
- static TAO_GIOP::Message_Type recv_request (TAO_Transport *transport,
+ static TAO_GIOP::Message_Type recv_message (TAO_Transport *transport,
TAO_InputCDR &msg,
- TAO_ORB_Core *orb_core);
- // Reads message, returns message type from header.
+ TAO_ORB_Core *orb_core,
+ TAO_GIOP_Version &version);
+ // Reads message returns message type from header.
static void dump_msg (const char *label,
const u_char *ptr,
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 13007ddc9f2..f1417069660 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -40,10 +40,21 @@ ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_Transport_Timeprobe_Description,
#endif /* ACE_ENABLE_TIMEPROBES */
-TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Handler_Base* handler)
+TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Handler_Base* handler,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms,
+ TAO_IIOP_Wait_Strategy *ws)
: handler_(handler),
- tag_(TAO_IOP_TAG_INTERNET_IOP)
+ tag_(TAO_IOP_TAG_INTERNET_IOP),
+ rms_ (rs),
+ ws_ (ws)
{
+ // @@ I am creating the RMS strategy here. (alex)
+ ACE_NEW (rms_,
+ TAO_IIOP_Exclusive_RMS);
+
+ // @@ Creating the WS here. (alex)
+ ACE_NEW (ws_,
+ TAO_Wait_On_Read);
}
TAO_IIOP_Transport::~TAO_IIOP_Transport (void)
@@ -149,6 +160,121 @@ TAO_IIOP_Client_Transport::send_request (TAO_ORB_Core *orb_core,
// return 1;
// }
+int
+TAO_IIOP_Client_Transport::handle_client_input (int read_header)
+{
+ // When we multiplex several invocations over a connection we need
+ // to allocate the CDR stream *here*, but when there is a single
+ // request over a connection the CDR stream can be pre-allocated on
+ // the stack of the thread that sent the request!
+ // Can we preserve this optimization on the new architecture?
+ //
+ // here is how:
+ //
+ // Use an "factory" to obtain the CDR stream, in the Muxed case the
+ // factory simply allocates a new one, in the Exclusive case the
+ // factory returns a pointer to the pre-allocated CDR.
+ //
+ // Receive the message. Get also the GIOP version number.
+ // <recv_message> is non-blocking!!!!
+ //
+ // + In <recv_message>, Don't worry about blocking on the GIOP
+ // header, it's only 12 bytes, use read_n() for the header but
+ // non-blocking for the rest.
+ //
+ // + After reading the header you can allocate memory for the
+ // complete buffer [this is there already, look at how they
+ // do it!]
+ //
+
+ TAO_InputCDR* cdr =
+ this->request_reply_strategy->get_cdr_stream ();
+
+ TAO_GIOP_Version version;
+
+ int message_type =
+ GIOP::recv_message (this,
+ cdr,
+ this->orb_core_,
+ version);
+
+ switch (message_type)
+ {
+ case TAO_GIOP::ShortRead:
+ // Return a value so that this we will get called again to
+ // handle the input.
+ return 0;
+ // NOT REACHED.
+
+ case TAO_GIOP::EndOfFile:
+ case TAO_GIOP::CommunicatioError:
+ case TAO_GIOP::MessageError:
+ // Handle errors like those...
+ this->reply_handler_->error ();
+ return 1;
+
+ case TAO_GIOP::Request:
+ // In GIOP 1.0 and GIOP 1.1 this is an error, but it is
+ // *possible* to receive requests in GIOP 1.2. Don't handle this
+ // on the firt iteration, leave it for the nearby future...
+ // ERROR too.
+ this->reply_handler_->error ();
+ return 1;
+
+ case TAO_GIOP::LocateReply:
+ case TAO_GIOP::Reply:
+ // Handle after the switch.
+ break;
+ }
+
+ // For GIOP 1.0 and 1.1 the reply_ctx comes first:
+ TAO_GIOP_ServiceContextList reply_ctx;
+ this->inp_stream_ >> reply_ctx;
+
+ // We should pass that reply_ctx to the invocation, interceptors
+ // will want to read it!
+
+ // Read the request id and the reply status type.
+ // status can be NO_EXCEPTION, SYSTEM_EXCEPTION, USER_EXCEPTION,
+ // LOCATION_FORWARD or (on GIOP 1.2) LOCATION_FORWARD_PERM
+
+ CORBA::ULong request_id;
+ CORBA::ULong reply_status;
+
+ if (!this->inp_stream_.read_ulong (request_id))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):TAO_IIOP_Client_Transport::handle_client_input: "
+ "Failed to read request_id.\n"),
+ -1);
+
+ if (!this->inp_stream_.read_ulong (reply_status))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):TAO_IIOP_Client_Transport::handle_client_input: "
+ "Failed to read request_status type.\n"),
+ -1);
+
+ // Find the TAO_Reply_Handler for that request ID!
+ TAO_Reply_Handler* reply_handler =
+ this->request_reply_strategy->find_handler (request_id);
+ if (reply_handler == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):TAO_IIOP_Client_Transport::handle_client_input: "
+ "Failed to find Reply Handler.\n"),
+ -1);
+
+ // Handle the reply.
+ if (reply_handler->handle_reply (reply_status, cdr) == -1)
+ return -1;
+
+ // This is a NOOP for the Exclusive request case, but it actually
+ // destroys the stream in the muxed case.
+ this->request_multiplexing_stratetgy_->destroy_cdr_stream (cdr);
+
+ // Return something to indicate the reply is received.
+ return 1;
+}
+
+
ssize_t
TAO_IIOP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *s)
{
@@ -269,3 +395,230 @@ TAO_IIOP_Transport::recv (iovec *iov,
ACE_UNUSED_ARG (s);
return handler_->peer ().recvv_n (iov, iovcnt);
}
+
+TAO_GIOP_ReplyStatusType
+TAO_IIOP_Transport::wait_for_reply (CORBA::ULong request_id)
+{
+ return this->ws_->wait (request_id,
+ this->rms_);
+}
+
+// *********************************************************************
+
+TAO_IIOP_Request_Multiplexing_Strategy::TAO_IIOP_Request_Multiplexing_Strategy (void)
+{
+}
+
+TAO_IIOP_Request_Multiplexing_Strategy::~TAO_IIOP_Request_Multiplexing_Strategy (void)
+{
+}
+
+// *********************************************************************
+
+TAO_IIOP_Muxed_RMS::TAO_IIOP_Muxed_RMS (void)
+{
+}
+
+TAO_IIOP_Muxed_RMS::~TAO_IIOP_Muxed_RMS (void)
+{
+}
+
+TAO_InputCDR *
+TAO_IIOP_Muxed_RMS::get_cdr_stream (void)
+{
+ // @@
+}
+
+void
+TAO_IIOP_Muxed_RMS::destroy_cdr_stream (TAO_InputCDR *cdr)
+{
+ delete cdr;
+ cdr = 0;
+}
+
+// *********************************************************************
+
+TAO_IIOP_Exclusive_RMS::TAO_IIOP_Exclusive_RMS (void)
+ : request_id_ (0),
+ rh_ (0)
+{
+}
+
+TAO_IIOP_Exclusive_RMS::~TAO_IIOP_Exclusive_RMS (void)
+{
+}
+
+// Bind the handler with the request id.
+int
+TAO_IIOP_Exclusive_RMS::bind_handler (CORBA::ULong request_id,
+ TAO_Reply_Handlern *rh)
+{
+ if (rh_ != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:TAO_IIOP_Exclusive_RMS::bind_handler: "
+ "Failed to bind the handler\n"),
+ -1);
+ this->request_id_ = request_id;
+ this->rh_ = rh;
+ return 0;
+}
+
+// Find the Reply Handler.
+TAO_Reply_Handler*
+TAO_IIOP_Exclusive_RMS::find_handler (CORBA::ULong request_id)
+{
+ if (this->request_id_ != request_id)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:TAO_IIOP_Exclusive_RMS::find_handler: ",
+ "Failed to find the handler\n"),
+ 0);
+
+ return this->rh_;
+}
+
+// Return the preallocated CDR stream.
+TAO_InputCDR *
+TAO_IIOP_Exclusive_RMS::get_cdr_stream (void)
+{
+ return this->cdr_;
+}
+
+// NOOP function.
+void
+TAO_IIOP_Exclusive_RMS::destroy_cdr_stream (TAO_InputCDR *)
+{
+}
+
+// *********************************************************************
+
+// Constructor.
+TAO_Wait_On_Reactor::TAO_Wait_On_Reactor (void)
+{
+}
+
+// Destructor.
+TAO_Wait_On_Reactor::~TAO_Wait_On_Reactor (void)
+{
+}
+
+TAO_GIOP_ReplyStatusType
+TAO_Wait_On_Reactor::wait (CORBA::ULong request_id,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms)
+{
+ int end_loop_flag = 0;
+
+ TAO_Reactor_Reply_Handler rh (end_loop_flag);
+
+ if (rrs->bind (request_id, &rh) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):TAO_Wait_On_Reactor::wait: "
+ "Failed to bind reply handler.\n"),
+ -1);
+ int result = 0;
+ while ((result != -1) && (end_loop_flag == 0))
+ result = this->orb_core_->reactor ()->handle_events (/* timeout */);
+
+ return 0;
+}
+
+// *********************************************************************
+
+// Constructor.
+TAO_Wait_On_Leader_Follower::TAO_Wait_On_Leader_Follower (void)
+{
+}
+
+// Destructor.
+TAO_Wait_On_Leader_Follower::~TAO_Wait_On_Leader_Follower (void)
+{
+}
+
+
+TAO_GIOP_ReplyStatusType
+TAO_Wait_On_Leader_Follower::wait (CORBA::ULong request_id,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms)
+{
+ // Grab leader follower lock.
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX,
+ ace_mon,
+ this->orb_core_->leader_follower_lock (),
+ -1));
+
+ // Bind the reply handler.
+
+ TAO_Leader_Follower_Reply_Handler rh (..);
+
+ rrs->bind (this->request_id, &rh);
+
+ // Check if we need to become the leader.
+ if (!this->orb_core_->leader_available ())
+ {
+ // This might increase the refcount of the leader.
+ this->orb_core_->set_leader_thread ();
+
+ // Do the reactor event loop.
+ this->orb_core_->reactor ()->owner (ACE_Thread::self ());
+
+ int result = 0;
+ while (result != -1)
+ result = this->orb_core_->reactor ()->handle_events ();
+
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):TAO_Wait_On_Leader_Follower::wait: "
+ "Reactor::handle_events failed.\n"),
+ -1);
+ }
+ else
+ {
+ // Block on condition variable.
+ ACE_SYNCH_CONDITION* cond =
+ this->cond_response_available (orb_core);
+ if (this->orb_core_->add_follower (cond) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P|%t) TAO_Wait_On_Leader_Follower::wait: "
+ "Failed to add a follower thread\n"));
+ cond->wait ();
+ }
+ return 0;
+}
+
+// *********************************************************************
+
+// Constructor.
+TAO_Wait_On_Read::TAO_Wait_On_Read (TAO_IIOP_Transport *transport)
+ : transport_ (transport)
+{
+}
+
+// Destructor.
+TAO_Wait_On_Read::~TAO_Wait_On_Read (void)
+{
+}
+
+// Wait on the read operation.
+int
+TAO_Wait_On_Read::wait (CORBA::ULong request_id,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms)
+{
+ TAO_Wait_On_Read_RH rh;
+
+ // Bind the <request_id, handler> pair with the strategy.
+ if (rms->bind (request_id, &rh) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:TAO_Wait_On_Read::wait: "
+ "Failed to bind the Reply Handler"),
+ -1);
+
+ int received_reply = 0;
+ while (!received_reply)
+ {
+ // In this case sockets *must* be blocking.
+ // We need to control how they are set!
+ received_reply = this->transport_->handle_client_input ();
+ }
+
+ return 0;
+}
+
+
diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h
index 5168c633cc8..ec3567b0c85 100644
--- a/TAO/tao/IIOP_Transport.h
+++ b/TAO/tao/IIOP_Transport.h
@@ -37,7 +37,9 @@ class TAO_Export TAO_IIOP_Transport : public TAO_Transport
// = DESCRIPTION
// @@ Fred, please fill in here.
public:
- TAO_IIOP_Transport (TAO_IIOP_Handler_Base *handler);
+ TAO_IIOP_Transport (TAO_IIOP_Handler_Base *handler,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms = 0,
+ TAO_IIOP_Wait_Strategy *ws = 0);
// Base object's creator method.
~TAO_IIOP_Transport (void);
@@ -105,6 +107,9 @@ public:
int /* twoway */) { return -1; };
// Default action to be taken for send request.
+ virtual int wait_for_reply (CORBA::ULong request_id);
+ // Wait for the reply depending on the strategy.
+
protected:
TAO_IIOP_Handler_Base *handler_;
// the connection service handler used for accessing lower layer
@@ -112,6 +117,22 @@ protected:
CORBA::ULong tag_;
// IIOP tag.
+
+ TAO_IIOP_Request_Multiplexing_Strategy *request_reply_strategy_;
+ // Strategy to decide whether multiple requests can be sent over the
+ // same connection or the connection is exclusive for a request.
+
+ TAO_IIOP_Wait_Strategy *wait_strategy_;
+ // Strategy for waiting for the reply after sending the request.
+
+ // = States for the input message.
+
+ size_t len_;
+ // Total length of the whole message. This does not include the
+ // header length.
+
+ size_t offset_;
+ // Current offset of the input message.
};
class TAO_Export TAO_IIOP_Client_Transport : public TAO_IIOP_Transport
@@ -123,7 +144,9 @@ class TAO_Export TAO_IIOP_Client_Transport : public TAO_IIOP_Transport
// = DESCRIPTION
// @@ Fred, please fill in here.
public:
- TAO_IIOP_Client_Transport (TAO_Client_Connection_Handler *handler);
+ TAO_IIOP_Client_Transport (TAO_Client_Connection_Handler *handler,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms = 0,
+ TAO_IIOP_Wait_Strategy *ws = 0););
// Constructor. Note, TAO_IIOP_Handler_Base is the base class for
// both TAO_Client_Connection_Handler and
// TAO_Server_Connection_Handler.
@@ -142,6 +165,9 @@ public:
// concurrency strategies, typically using the leader-follower
// pattern.
+ int handle_client_input (void);
+ // Read and handle the reply.
+
private:
TAO_Client_Connection_Handler *client_handler_;
// pointer to the corresponding client side connection handler.
@@ -174,4 +200,199 @@ private:
// Pointer to the corresponding connection handler.
};
+
+class TAO_Export TAO_IIOP_Request_Multiplexing_Strategy
+{
+ // = TITLE
+ //
+ // Strategy to determine whether the connection should be
+ // multiplexed for multiple requests or it is exclusive for a
+ // single request at a time.
+ //
+ // = DESCRIPTION
+ //
+
+public:
+ TAO_IIOP_Request_Multiplexing_Strategy (void);
+ // Base class constructor.
+
+ virtual ~TAO_IIOP_Request_Multiplexing_Strategy (void);
+ // Base class destructor.
+
+ // = Bind and Find methods for the <Request ID, ReplyHandler>
+ // pairs. The ReplyHandler is not the CORBA ReplyHandler of the
+ // AMI's.
+
+ virtual int bind_handler (CORBA::ULong request_id,
+ TAO_Reply_Handlern *rh) = 0;
+ // Bind the handler with the request id.
+
+ virtual TAO_Reply_Handler* find_handler (CORBA::ULong request_id) = 0;
+ // Find the Reply Handler.
+
+ // = "Factory methods" to obtain the CDR stream, in the Muxed case
+ // the factory simply allocates a new one, in the Exclusive case
+ // the factory returns a pointer to the pre-allocated CDR.
+
+ virtual TAO_InputCDR *get_cdr_stream (void) = 0;
+ // Get the CDR stream.
+
+ virtual void destroy_cdr_stream (TAO_InputCDR*) = 0;
+ // Destroy the CDR stream.
+};
+
+class TAO_Export TAO_IIOP_Muxed_RMS : public TAO_IIOP_Request_Multiplexing_Strategy
+{
+ // = TITLE
+ //
+ // Connection is multiplexed for many requests.
+ //
+ // = DESCRIPTION
+ //
+
+public:
+ TAO_IIOP_Muxed_RMS (void);
+ // Constructor.
+
+ virtual ~TAO_IIOP_Muxed_RMS (void);
+ // Destructor.
+
+ virtual int bind_handler (CORBA::ULong request_id,
+ TAO_Reply_Handlern *rh);
+ // Bind the handler with the request id.
+
+ virtual TAO_Reply_Handler* find_handler (CORBA::ULong request_id);
+ // Find the Reply Handler.
+
+ virtual TAO_InputCDR *get_cdr_stream (void);
+ // Create a new CDR stream and return.
+
+ virtual void destroy_cdr_stream (TAO_InoutCDR *ccdr);
+ // Delete the cdr stream.
+};
+
+class TAO_Export TAO_IIOP_Exclusive_RMS : public TAO_IIOP_Request_Multiplexing_Strategy
+{
+ // = TITLE
+ //
+ // Connection exclusive for the request.
+ //
+ // = DESCRIPTION
+ //
+
+public:
+ TAO_IIOP_Exclusive_RMS (void);
+ // Constructor.
+
+ virtual ~TAO_IIOP_Exclusive_RMS (void);
+ // Destructor.
+
+ virtual int bind_handler (CORBA::ULong request_id,
+ TAO_Reply_Handlern *rh);
+ // Bind the handler with the request id.
+
+ virtual TAO_Reply_Handler* find_handler (CORBA::ULong request_id);
+ // Find the Reply Handler.
+
+ virtual TAO_InputCDR *get_cdr_stream (void);
+ // Return the preallocated CDR stream.
+
+ virtual void destroy_cdr_stream (TAO_InputCDR *cdr);
+ // NOOP function.
+
+protected:
+ CORBA::ULong request_id_;
+ // Request id for the current request.
+
+ TAO_Reply_Handler *rh_;
+ // Reply Handler corresponding to the request.
+
+ TAO_InputCDR cdr_;
+ // @@ Preallocated CDR stream.
+};
+
+class TAO_Export TAO_IIOP_Wait_Strategy
+{
+ // = TITLE
+ //
+ // Strategy for waiting for the reply.
+ //
+ // = DESCRIPTION
+ //
+
+public:
+ TAO_IIOP_Wait_Strategy (void);
+ // Constructor.
+
+ virtual ~TAO_IIOP_Wait_Strategy (void);
+ // Destructor.
+
+ virtual TAO_GIOP_ReplyStatusType wait (CORBA::ULong request_id,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms) = 0;
+ // Base class virtual method
+};
+
+class TAO_Export TAO_Wait_On_Reactor : public TAO_IIOP_Wait_Strategy
+{
+ // = TITLE
+ //
+ // Wait on the Reactor. Happens in s Single Threaded client
+ // environment.
+ //
+ // = DESCRIPTION
+ //
+
+public:
+ TAO_Wait_On_Reactor (void);
+ // Constructor.
+
+ virtual ~TAO_Wait_On_Reactor (void);
+ // Destructor.
+
+ virtual TAO_GIOP_ReplyStatusType wait (CORBA::ULong request_id,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms);
+ // Do the event loop of the Reactor.
+};
+
+class TAO_Export TAO_Wait_On_Leader_Follower : public TAO_IIOP_Wait_Strategy
+{
+ // = TITLE
+ //
+ // Wait according to the Leader-Follower model. Leader does the
+ // event loop of the Reactor and the Followers wait on the
+ // condition variable.
+ //
+ // = DESCRIPTION
+ //
+
+ TAO_Wait_On_Leader_Follower (void);
+ // Constructor.
+
+ virtual ~TAO_Wait_On_Leader_Follower (void);
+ // Destructor.
+
+ virtual TAO_GIOP_ReplyStatusType wait (CORBA::ULong request_id,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms);
+ // Wait according to the L-F model.
+};
+
+class TAO_Export TAO_Wait_On_Read : public TAO_IIOP_Wait_Strategy
+{
+ // = TITLE
+ //
+ // Wait on receiving the reply.
+ //
+ // = DESCRIPTION
+ //
+
+ TAO_Wait_On_Read (void);
+ // Constructor.
+
+ virtual ~TAO_Wait_On_Read (void);
+ // Destructor.
+
+ virtual TAO_GIOP_ReplyStatusType wait (CORBA::ULong request_id,
+ TAO_IIOP_Request_Multiplexing_Strategy *rms);
+ // Wait on the read operation.
+};
#endif /* TAO_IIOP_TRANSPORT_H */
diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp
index 25753edba5a..8734c4d4644 100644
--- a/TAO/tao/Invocation.cpp
+++ b/TAO/tao/Invocation.cpp
@@ -312,6 +312,8 @@ TAO_GIOP_Invocation::write_request_header
}
+// @@ Does this comment make sense?. We dont wait for reply, right?
+// (alex)
// Send request, block until any reply comes back, and unmarshal reply
// parameters as appropriate.
@@ -636,8 +638,9 @@ TAO_GIOP_Twoway_Invocation::invoke (TAO_Exception_Data *excepts,
int
TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV)
- ACE_THROW_SPEC ((CORBA::SystemException))
+ ACE_THROW_SPEC ((CORBA::SystemException))
{
+ // Just send the request, without trying to wait for the reply.
int retval = TAO_GIOP_Invocation::invoke (1, ACE_TRY_ENV);
ACE_CHECK_RETURN (retval);
ACE_UNUSED_ARG (retval);
@@ -677,7 +680,43 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV)
// according to POSIX, all C stack frames must also have their
// (explicitly coded) handlers called. We assume a POSIX.1c/C/C++
// environment.
+
+ // Get the reply status.
+
+ TAO_GIOP_ReplyStatusType reply_status =
+ this->transport_->wait_for_reply (this->request_id_,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (reply_status);
+
+ switch (reply_status)
+ {
+ case TAO_GIOP_NO_EXCEPTION:
+ // Return so that the STUB can demarshal the reply.
+ return TAO_INVOKE_OK;
+ // NOT REACHED.
+
+ case TAO_GIOP_USER_EXCEPTION:
+ // Return so that the STUB can demarshal the user exception.
+ return TAO_INVOKE_EXCEPTION;
+ // NOTREACHED.
+
+ case TAO_GIOP_SYSTEM_EXCEPTION:
+ {
+ // Demarshal the system exception and raise it!
+ return TAO_INVOKE_EXCEPTION;
+ }
+ // NOTREACHED.
+ case TAO_GIOP_LOCATION_FORWARD:
+ // Handle the forwarding and return so the stub restarts the
+ // request!
+ return this->location_forward (this->inp_stream_, ACE_TRY_ENV);
+ // NOT REACHED.
+ }
+ return 0;
+}
+
+#if 0
// @@ Fred: if it makes sense to have a wrapper for send_request on
// the TAO_Transport class then it should also make sense to have
// one for recv_request(), right?
@@ -688,12 +727,6 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV)
// suspend was called in TAO_Client_Connection_Handler::handle_input
this->transport_->resume_connection (this->orb_core_->reactor ());
- switch (m)
- {
- case TAO_GIOP::Reply:
- // The reply is handled at the end of this switch() statement.
- break;
-
case TAO_GIOP::CloseConnection:
// Try the same profile again, but open a new connection.
// If that fails then we go to the next profile.
@@ -890,6 +923,7 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV)
return TAO_INVOKE_EXCEPTION;
}
+#endif /* 0 */
// ****************************************************************
diff --git a/TAO/tao/Invocation.i b/TAO/tao/Invocation.i
index 93d9a9ca9d6..415ad873a06 100644
--- a/TAO/tao/Invocation.i
+++ b/TAO/tao/Invocation.i
@@ -77,7 +77,7 @@ TAO_GIOP_Oneway_Invocation::invoke (CORBA::Environment &ACE_TRY_ENV)
return TAO_GIOP_Invocation::invoke (0, ACE_TRY_ENV);
}
-// ****************************************************************
+// *********************************************************************
ACE_INLINE
TAO_GIOP_Locate_Request_Invocation::