diff options
author | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-11 04:12:35 +0000 |
---|---|---|
committer | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-11 04:12:35 +0000 |
commit | 9072338cbfd971f21105c33cc2d83341e99ddcd7 (patch) | |
tree | c670719da79412a97e98ecfacc60a93278898c39 | |
parent | ba39b492dad25e56a1e07e0c8c898c80bd456d2f (diff) | |
download | ATCD-9072338cbfd971f21105c33cc2d83341e99ddcd7.tar.gz |
First attempt towards revamping the ORB for AMI model implementation.
Integrated all of Carlos' suggestions on to the architecture. First
tried to get the "Wait_On_Read" strategy to work. Not complete yet.
-rw-r--r-- | TAO/tao/Connect.cpp | 9 | ||||
-rw-r--r-- | TAO/tao/GIOP.cpp | 213 | ||||
-rw-r--r-- | TAO/tao/GIOP.h | 10 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 357 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.h | 225 | ||||
-rw-r--r-- | TAO/tao/Invocation.cpp | 48 | ||||
-rw-r--r-- | TAO/tao/Invocation.i | 2 |
7 files changed, 742 insertions, 122 deletions
diff --git a/TAO/tao/Connect.cpp b/TAO/tao/Connect.cpp index aa379474670..9a6cbdaa708 100644 --- a/TAO/tao/Connect.cpp +++ b/TAO/tao/Connect.cpp @@ -875,8 +875,7 @@ TAO_Client_Connection_Handler::send_request (TAO_ORB_Core *, int TAO_Client_Connection_Handler::handle_input (ACE_HANDLE) { - errno = ENOTSUP; - return -1; + return this->transport_->handle_client_input (); } int @@ -1177,7 +1176,7 @@ TAO_MT_Client_Connection_Handler::send_request (TAO_ORB_Core *orb_core, // Save the ORB_Core for the handle_input callback... this->orb_core_ = orb_core; - // NOTE: Here would also be a fine place to calculate a digital + // NOTE: Here would also be a find place to calculate a digital // signature for the message and place it into a preallocated slot // in the "ServiceContext". Similarly, this is a good spot to // encrypt messages (or just the message bodies) if that's needed in @@ -1280,7 +1279,7 @@ TAO_MT_Client_Connection_Handler::send_request (TAO_ORB_Core *orb_core, // TAO_ORB_Core. this->orb_core_->set_leader_thread (); - // this might increase the recount of the leader + // this might increase the refcount of the leader if (this->orb_core_->leader_follower_lock ().release () == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -1334,7 +1333,7 @@ TAO_MT_Client_Connection_Handler::handle_input (ACE_HANDLE) if (!this->expecting_response_) { - // we got something, but did not want + // we got something, but did not want. // @@ wake up an other thread, we are lost if (this->orb_core_->leader_follower_lock ().release () == -1) diff --git a/TAO/tao/GIOP.cpp b/TAO/tao/GIOP.cpp index 278f33e6b2a..d17a3fc6968 100644 --- a/TAO/tao/GIOP.cpp +++ b/TAO/tao/GIOP.cpp @@ -68,8 +68,8 @@ static const char *TAO_GIOP_Timeprobe_Description[] = "GIOP::send_request - start", "GIOP::send_request - end", - "GIOP::recv_request - start", - "GIOP::recv_request - end", + "GIOP::recv_message - start", + "GIOP::recv_message - end", "GIOP::read_buffer - start", "GIOP::read_buffer - end", @@ -84,8 +84,8 @@ enum TAO_GIOP_SEND_REQUEST_START = 100, TAO_GIOP_SEND_REQUEST_END, - TAO_GIOP_RECV_REQUEST_START, - TAO_GIOP_RECV_REQUEST_END, + TAO_GIOP_RECV_MESSAGE_START, + TAO_GIOP_RECV_MESSAGE_END, TAO_GIOP_READ_BUFFER_START, TAO_GIOP_READ_BUFFER_END, @@ -436,13 +436,18 @@ TAO_GIOP::read_buffer (TAO_Transport *transport, // performance. The two read () calls can be made into one by fancy // buffering. How fast could it be with both optimizations applied? +// I am now making this call non-blocking. For reading the header it +// is not non-blocking. But for reading the rest of the message, it is +// non-blocking. Total size and the current offset of the incoming +// message is kept at the Transport class. TAO_GIOP::Message_Type -TAO_GIOP::recv_request (TAO_Transport *transport, +TAO_GIOP::recv_message (TAO_Transport *transport, TAO_InputCDR &msg, - TAO_ORB_Core* orb_core) + TAO_ORB_Core* orb_core, + int read_header) { - TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_RECV_REQUEST_START); - + TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_RECV_MESSAGE_START); + // Read the message header off the wire. // // THREADING NOTE: the connection manager handed us this connection @@ -452,100 +457,106 @@ TAO_GIOP::recv_request (TAO_Transport *transport, // near future) but makes less effective use of connection resources // as the "duty factor" goes down because of either long calls or // bursty contention during numerous short calls to the same server. - - ACE_CDR::mb_align (&msg.start_); - - ssize_t header_len = TAO_GIOP_HEADER_LEN; - - if (orb_core->orb_params ()->use_lite_protocol ()) - header_len = TAO_GIOP_LITE_HEADER_LEN; - - if (ACE_CDR::grow (&msg.start_, - header_len) == -1) - // This should probably be an exception. - return TAO_GIOP::CommunicationError; - - char *header = msg.start_.rd_ptr (); - ssize_t len = TAO_GIOP::read_buffer (transport, - header, - header_len); - // Read the header into the buffer. - - if (len != header_len) + + + if (read_header) { - switch (len) + // This is the first read for this message. + + ACE_CDR::mb_align (&msg.start_); + + ssize_t header_len = TAO_GIOP_HEADER_LEN; + + if (orb_core->orb_params ()->use_lite_protocol ()) + header_len = TAO_GIOP_LITE_HEADER_LEN; + + if (ACE_CDR::grow (&msg.start_, + header_len) == -1) + // This should probably be an exception. + return TAO_GIOP::CommunicationError; + + char *header = msg.start_.rd_ptr (); + ssize_t len = TAO_GIOP::read_buffer (transport, + header, + header_len); + // Read the header into the buffer. + + if (len != header_len) { - case 0: - if (TAO_orbdebug) - ACE_DEBUG ((LM_DEBUG, - "(%t) End of connection, transport handle %d\n", - transport->handle ())); - return TAO_GIOP::EndOfFile; - // @@ should probably find some way to report this without - // an exception, since for most servers it's not an error. - // Is it _never_ an error? Not sure ... - /* NOTREACHED */ - - case -1: // error - if (TAO_orbdebug) - ACE_DEBUG ((LM_ERROR, - "(%P|%t) GIOP::recv_request header %p\n", - "read_buffer")); - break; - /* NOTREACHED */ - - default: - if (TAO_orbdebug) - ACE_DEBUG ((LM_ERROR, - "(%P|%t) GIOP::recv_request header read failed, " - "only %d of %d bytes\n", - len, - header_len)); - break; - /* NOTREACHED */ - } - - return TAO_GIOP::CommunicationError; - } - - // NOTE: if message headers, or whole messages, get encrypted in - // application software (rather than by the network infrastructure) - // they should be decrypted here ... - - // First make sure it's a GIOP message of any version. - - TAO_GIOP::Message_Type retval; - CORBA::ULong message_size; - if (TAO_GIOP::parse_header (msg, - msg.do_byte_swap_, - retval, - message_size, - orb_core) == -1) - { - TAO_GIOP::send_error (transport); - // We didn't really receive anything useful here. - return TAO_GIOP::CommunicationError; - - } - - // Make sure we have the full length in memory, growing the buffer - // if needed. - // - // NOTE: We could overwrite these few bytes of header... they're - // left around for now as a debugging aid. - - assert (message_size <= UINT_MAX); - - if (ACE_CDR::grow (&msg.start_, - header_len + message_size) == -1) - return TAO_GIOP::CommunicationError; + switch (len) + { + case 0: + if (TAO_orbdebug) + ACE_DEBUG ((LM_DEBUG, + "(%t) End of connection, transport handle %d\n", + transport->handle ())); + return TAO_GIOP::EndOfFile; + // @@ should probably find some way to report this without + // an exception, since for most servers it's not an error. + // Is it _never_ an error? Not sure ... + /* NOTREACHED */ + + case -1: // error + if (TAO_orbdebug) + ACE_DEBUG ((LM_ERROR, + "(%P|%t) GIOP::recv_message header %p\n", + "read_buffer")); + break; + /* NOTREACHED */ + + default: + if (TAO_orbdebug) + ACE_DEBUG ((LM_ERROR, + "(%P|%t) GIOP::recv_message header read failed, " + "only %d of %d bytes\n", + len, + header_len)); + break; + /* NOTREACHED */ + } + + return TAO_GIOP::CommunicationError; + + // NOTE: if message headers, or whole messages, get encrypted in + // application software (rather than by the network infrastructure) + // they should be decrypted here ... + + // First make sure it's a GIOP message of any version. + + TAO_GIOP::Message_Type retval; + CORBA::ULong message_size; + if (TAO_GIOP::parse_header (msg, + msg.do_byte_swap_, + retval, + message_size, + orb_core) == -1) + { + TAO_GIOP::send_error (transport); + // We didn't really receive anything useful here. + return TAO_GIOP::CommunicationError; + } + + // Make sure we have the full length in memory, growing the + // buffer if needed. + // + // NOTE: We could overwrite these few bytes of header... they're + // left around for now as a debugging aid. + + assert (message_size <= UINT_MAX); + + if (ACE_CDR::grow (&msg.start_, + header_len + message_size) == -1) + return TAO_GIOP::CommunicationError; + + // Growing the buffer may have reset the rd_ptr(), but we want to + // leave it just after the GIOP header (that was parsed already); + ACE_CDR::mb_align (&msg.start_); + msg.start_.wr_ptr (header_len); + msg.start_.wr_ptr (message_size); + msg.start_.rd_ptr (header_len); + + // Keep the - // Growing the buffer may have reset the rd_ptr(), but we want to - // leave it just after the GIOP header (that was parsed already); - ACE_CDR::mb_align (&msg.start_); - msg.start_.wr_ptr (header_len); - msg.start_.wr_ptr (message_size); - msg.start_.rd_ptr (header_len); char* payload = msg.start_.rd_ptr (); @@ -570,7 +581,7 @@ TAO_GIOP::recv_request (TAO_Transport *transport, case -1: if (TAO_orbdebug) ACE_DEBUG ((LM_ERROR, - "(%P|%t) TAO_GIOP::recv_request - body %p\n", + "(%P|%t) TAO_GIOP::recv_message - body %p\n", "read_buffer")); break; /* NOTREACHED */ @@ -578,7 +589,7 @@ TAO_GIOP::recv_request (TAO_Transport *transport, default: if (TAO_orbdebug) ACE_DEBUG ((LM_ERROR, - "TAO: (%P|%t) GIOP::recv_request body read failed, " + "TAO: (%P|%t) GIOP::recv_message body read failed, " "only %d of %d bytes\n", len, message_size)); diff --git a/TAO/tao/GIOP.h b/TAO/tao/GIOP.h index 4c27a04b220..e19e0d7c870 100644 --- a/TAO/tao/GIOP.h +++ b/TAO/tao/GIOP.h @@ -303,7 +303,8 @@ public: // All GIOP messages include a header and message type. Not // really a message type, but needed to bring that information // back somehow. - + + ShortRead = -3, // Short read. CommunicationError = -2, // Invalid request. EndOfFile = -1, // "discovered" by either. Request = 0, // sent by client. @@ -329,10 +330,11 @@ public: TAO_ORB_Core* orb_core); // Send message, returns TRUE if success, else FALSE. - static TAO_GIOP::Message_Type recv_request (TAO_Transport *transport, + static TAO_GIOP::Message_Type recv_message (TAO_Transport *transport, TAO_InputCDR &msg, - TAO_ORB_Core *orb_core); - // Reads message, returns message type from header. + TAO_ORB_Core *orb_core, + TAO_GIOP_Version &version); + // Reads message returns message type from header. static void dump_msg (const char *label, const u_char *ptr, diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 13007ddc9f2..f1417069660 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -40,10 +40,21 @@ ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_Transport_Timeprobe_Description, #endif /* ACE_ENABLE_TIMEPROBES */ -TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Handler_Base* handler) +TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Handler_Base* handler, + TAO_IIOP_Request_Multiplexing_Strategy *rms, + TAO_IIOP_Wait_Strategy *ws) : handler_(handler), - tag_(TAO_IOP_TAG_INTERNET_IOP) + tag_(TAO_IOP_TAG_INTERNET_IOP), + rms_ (rs), + ws_ (ws) { + // @@ I am creating the RMS strategy here. (alex) + ACE_NEW (rms_, + TAO_IIOP_Exclusive_RMS); + + // @@ Creating the WS here. (alex) + ACE_NEW (ws_, + TAO_Wait_On_Read); } TAO_IIOP_Transport::~TAO_IIOP_Transport (void) @@ -149,6 +160,121 @@ TAO_IIOP_Client_Transport::send_request (TAO_ORB_Core *orb_core, // return 1; // } +int +TAO_IIOP_Client_Transport::handle_client_input (int read_header) +{ + // When we multiplex several invocations over a connection we need + // to allocate the CDR stream *here*, but when there is a single + // request over a connection the CDR stream can be pre-allocated on + // the stack of the thread that sent the request! + // Can we preserve this optimization on the new architecture? + // + // here is how: + // + // Use an "factory" to obtain the CDR stream, in the Muxed case the + // factory simply allocates a new one, in the Exclusive case the + // factory returns a pointer to the pre-allocated CDR. + // + // Receive the message. Get also the GIOP version number. + // <recv_message> is non-blocking!!!! + // + // + In <recv_message>, Don't worry about blocking on the GIOP + // header, it's only 12 bytes, use read_n() for the header but + // non-blocking for the rest. + // + // + After reading the header you can allocate memory for the + // complete buffer [this is there already, look at how they + // do it!] + // + + TAO_InputCDR* cdr = + this->request_reply_strategy->get_cdr_stream (); + + TAO_GIOP_Version version; + + int message_type = + GIOP::recv_message (this, + cdr, + this->orb_core_, + version); + + switch (message_type) + { + case TAO_GIOP::ShortRead: + // Return a value so that this we will get called again to + // handle the input. + return 0; + // NOT REACHED. + + case TAO_GIOP::EndOfFile: + case TAO_GIOP::CommunicatioError: + case TAO_GIOP::MessageError: + // Handle errors like those... + this->reply_handler_->error (); + return 1; + + case TAO_GIOP::Request: + // In GIOP 1.0 and GIOP 1.1 this is an error, but it is + // *possible* to receive requests in GIOP 1.2. Don't handle this + // on the firt iteration, leave it for the nearby future... + // ERROR too. + this->reply_handler_->error (); + return 1; + + case TAO_GIOP::LocateReply: + case TAO_GIOP::Reply: + // Handle after the switch. + break; + } + + // For GIOP 1.0 and 1.1 the reply_ctx comes first: + TAO_GIOP_ServiceContextList reply_ctx; + this->inp_stream_ >> reply_ctx; + + // We should pass that reply_ctx to the invocation, interceptors + // will want to read it! + + // Read the request id and the reply status type. + // status can be NO_EXCEPTION, SYSTEM_EXCEPTION, USER_EXCEPTION, + // LOCATION_FORWARD or (on GIOP 1.2) LOCATION_FORWARD_PERM + + CORBA::ULong request_id; + CORBA::ULong reply_status; + + if (!this->inp_stream_.read_ulong (request_id)) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t):TAO_IIOP_Client_Transport::handle_client_input: " + "Failed to read request_id.\n"), + -1); + + if (!this->inp_stream_.read_ulong (reply_status)) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t):TAO_IIOP_Client_Transport::handle_client_input: " + "Failed to read request_status type.\n"), + -1); + + // Find the TAO_Reply_Handler for that request ID! + TAO_Reply_Handler* reply_handler = + this->request_reply_strategy->find_handler (request_id); + if (reply_handler == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t):TAO_IIOP_Client_Transport::handle_client_input: " + "Failed to find Reply Handler.\n"), + -1); + + // Handle the reply. + if (reply_handler->handle_reply (reply_status, cdr) == -1) + return -1; + + // This is a NOOP for the Exclusive request case, but it actually + // destroys the stream in the muxed case. + this->request_multiplexing_stratetgy_->destroy_cdr_stream (cdr); + + // Return something to indicate the reply is received. + return 1; +} + + ssize_t TAO_IIOP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *s) { @@ -269,3 +395,230 @@ TAO_IIOP_Transport::recv (iovec *iov, ACE_UNUSED_ARG (s); return handler_->peer ().recvv_n (iov, iovcnt); } + +TAO_GIOP_ReplyStatusType +TAO_IIOP_Transport::wait_for_reply (CORBA::ULong request_id) +{ + return this->ws_->wait (request_id, + this->rms_); +} + +// ********************************************************************* + +TAO_IIOP_Request_Multiplexing_Strategy::TAO_IIOP_Request_Multiplexing_Strategy (void) +{ +} + +TAO_IIOP_Request_Multiplexing_Strategy::~TAO_IIOP_Request_Multiplexing_Strategy (void) +{ +} + +// ********************************************************************* + +TAO_IIOP_Muxed_RMS::TAO_IIOP_Muxed_RMS (void) +{ +} + +TAO_IIOP_Muxed_RMS::~TAO_IIOP_Muxed_RMS (void) +{ +} + +TAO_InputCDR * +TAO_IIOP_Muxed_RMS::get_cdr_stream (void) +{ + // @@ +} + +void +TAO_IIOP_Muxed_RMS::destroy_cdr_stream (TAO_InputCDR *cdr) +{ + delete cdr; + cdr = 0; +} + +// ********************************************************************* + +TAO_IIOP_Exclusive_RMS::TAO_IIOP_Exclusive_RMS (void) + : request_id_ (0), + rh_ (0) +{ +} + +TAO_IIOP_Exclusive_RMS::~TAO_IIOP_Exclusive_RMS (void) +{ +} + +// Bind the handler with the request id. +int +TAO_IIOP_Exclusive_RMS::bind_handler (CORBA::ULong request_id, + TAO_Reply_Handlern *rh) +{ + if (rh_ != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:TAO_IIOP_Exclusive_RMS::bind_handler: " + "Failed to bind the handler\n"), + -1); + this->request_id_ = request_id; + this->rh_ = rh; + return 0; +} + +// Find the Reply Handler. +TAO_Reply_Handler* +TAO_IIOP_Exclusive_RMS::find_handler (CORBA::ULong request_id) +{ + if (this->request_id_ != request_id) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:TAO_IIOP_Exclusive_RMS::find_handler: ", + "Failed to find the handler\n"), + 0); + + return this->rh_; +} + +// Return the preallocated CDR stream. +TAO_InputCDR * +TAO_IIOP_Exclusive_RMS::get_cdr_stream (void) +{ + return this->cdr_; +} + +// NOOP function. +void +TAO_IIOP_Exclusive_RMS::destroy_cdr_stream (TAO_InputCDR *) +{ +} + +// ********************************************************************* + +// Constructor. +TAO_Wait_On_Reactor::TAO_Wait_On_Reactor (void) +{ +} + +// Destructor. +TAO_Wait_On_Reactor::~TAO_Wait_On_Reactor (void) +{ +} + +TAO_GIOP_ReplyStatusType +TAO_Wait_On_Reactor::wait (CORBA::ULong request_id, + TAO_IIOP_Request_Multiplexing_Strategy *rms) +{ + int end_loop_flag = 0; + + TAO_Reactor_Reply_Handler rh (end_loop_flag); + + if (rrs->bind (request_id, &rh) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t):TAO_Wait_On_Reactor::wait: " + "Failed to bind reply handler.\n"), + -1); + int result = 0; + while ((result != -1) && (end_loop_flag == 0)) + result = this->orb_core_->reactor ()->handle_events (/* timeout */); + + return 0; +} + +// ********************************************************************* + +// Constructor. +TAO_Wait_On_Leader_Follower::TAO_Wait_On_Leader_Follower (void) +{ +} + +// Destructor. +TAO_Wait_On_Leader_Follower::~TAO_Wait_On_Leader_Follower (void) +{ +} + + +TAO_GIOP_ReplyStatusType +TAO_Wait_On_Leader_Follower::wait (CORBA::ULong request_id, + TAO_IIOP_Request_Multiplexing_Strategy *rms) +{ + // Grab leader follower lock. + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + ace_mon, + this->orb_core_->leader_follower_lock (), + -1)); + + // Bind the reply handler. + + TAO_Leader_Follower_Reply_Handler rh (..); + + rrs->bind (this->request_id, &rh); + + // Check if we need to become the leader. + if (!this->orb_core_->leader_available ()) + { + // This might increase the refcount of the leader. + this->orb_core_->set_leader_thread (); + + // Do the reactor event loop. + this->orb_core_->reactor ()->owner (ACE_Thread::self ()); + + int result = 0; + while (result != -1) + result = this->orb_core_->reactor ()->handle_events (); + + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t):TAO_Wait_On_Leader_Follower::wait: " + "Reactor::handle_events failed.\n"), + -1); + } + else + { + // Block on condition variable. + ACE_SYNCH_CONDITION* cond = + this->cond_response_available (orb_core); + if (this->orb_core_->add_follower (cond) == -1) + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P|%t) TAO_Wait_On_Leader_Follower::wait: " + "Failed to add a follower thread\n")); + cond->wait (); + } + return 0; +} + +// ********************************************************************* + +// Constructor. +TAO_Wait_On_Read::TAO_Wait_On_Read (TAO_IIOP_Transport *transport) + : transport_ (transport) +{ +} + +// Destructor. +TAO_Wait_On_Read::~TAO_Wait_On_Read (void) +{ +} + +// Wait on the read operation. +int +TAO_Wait_On_Read::wait (CORBA::ULong request_id, + TAO_IIOP_Request_Multiplexing_Strategy *rms) +{ + TAO_Wait_On_Read_RH rh; + + // Bind the <request_id, handler> pair with the strategy. + if (rms->bind (request_id, &rh) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:TAO_Wait_On_Read::wait: " + "Failed to bind the Reply Handler"), + -1); + + int received_reply = 0; + while (!received_reply) + { + // In this case sockets *must* be blocking. + // We need to control how they are set! + received_reply = this->transport_->handle_client_input (); + } + + return 0; +} + + diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index 5168c633cc8..ec3567b0c85 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -37,7 +37,9 @@ class TAO_Export TAO_IIOP_Transport : public TAO_Transport // = DESCRIPTION // @@ Fred, please fill in here. public: - TAO_IIOP_Transport (TAO_IIOP_Handler_Base *handler); + TAO_IIOP_Transport (TAO_IIOP_Handler_Base *handler, + TAO_IIOP_Request_Multiplexing_Strategy *rms = 0, + TAO_IIOP_Wait_Strategy *ws = 0); // Base object's creator method. ~TAO_IIOP_Transport (void); @@ -105,6 +107,9 @@ public: int /* twoway */) { return -1; }; // Default action to be taken for send request. + virtual int wait_for_reply (CORBA::ULong request_id); + // Wait for the reply depending on the strategy. + protected: TAO_IIOP_Handler_Base *handler_; // the connection service handler used for accessing lower layer @@ -112,6 +117,22 @@ protected: CORBA::ULong tag_; // IIOP tag. + + TAO_IIOP_Request_Multiplexing_Strategy *request_reply_strategy_; + // Strategy to decide whether multiple requests can be sent over the + // same connection or the connection is exclusive for a request. + + TAO_IIOP_Wait_Strategy *wait_strategy_; + // Strategy for waiting for the reply after sending the request. + + // = States for the input message. + + size_t len_; + // Total length of the whole message. This does not include the + // header length. + + size_t offset_; + // Current offset of the input message. }; class TAO_Export TAO_IIOP_Client_Transport : public TAO_IIOP_Transport @@ -123,7 +144,9 @@ class TAO_Export TAO_IIOP_Client_Transport : public TAO_IIOP_Transport // = DESCRIPTION // @@ Fred, please fill in here. public: - TAO_IIOP_Client_Transport (TAO_Client_Connection_Handler *handler); + TAO_IIOP_Client_Transport (TAO_Client_Connection_Handler *handler, + TAO_IIOP_Request_Multiplexing_Strategy *rms = 0, + TAO_IIOP_Wait_Strategy *ws = 0);); // Constructor. Note, TAO_IIOP_Handler_Base is the base class for // both TAO_Client_Connection_Handler and // TAO_Server_Connection_Handler. @@ -142,6 +165,9 @@ public: // concurrency strategies, typically using the leader-follower // pattern. + int handle_client_input (void); + // Read and handle the reply. + private: TAO_Client_Connection_Handler *client_handler_; // pointer to the corresponding client side connection handler. @@ -174,4 +200,199 @@ private: // Pointer to the corresponding connection handler. }; + +class TAO_Export TAO_IIOP_Request_Multiplexing_Strategy +{ + // = TITLE + // + // Strategy to determine whether the connection should be + // multiplexed for multiple requests or it is exclusive for a + // single request at a time. + // + // = DESCRIPTION + // + +public: + TAO_IIOP_Request_Multiplexing_Strategy (void); + // Base class constructor. + + virtual ~TAO_IIOP_Request_Multiplexing_Strategy (void); + // Base class destructor. + + // = Bind and Find methods for the <Request ID, ReplyHandler> + // pairs. The ReplyHandler is not the CORBA ReplyHandler of the + // AMI's. + + virtual int bind_handler (CORBA::ULong request_id, + TAO_Reply_Handlern *rh) = 0; + // Bind the handler with the request id. + + virtual TAO_Reply_Handler* find_handler (CORBA::ULong request_id) = 0; + // Find the Reply Handler. + + // = "Factory methods" to obtain the CDR stream, in the Muxed case + // the factory simply allocates a new one, in the Exclusive case + // the factory returns a pointer to the pre-allocated CDR. + + virtual TAO_InputCDR *get_cdr_stream (void) = 0; + // Get the CDR stream. + + virtual void destroy_cdr_stream (TAO_InputCDR*) = 0; + // Destroy the CDR stream. +}; + +class TAO_Export TAO_IIOP_Muxed_RMS : public TAO_IIOP_Request_Multiplexing_Strategy +{ + // = TITLE + // + // Connection is multiplexed for many requests. + // + // = DESCRIPTION + // + +public: + TAO_IIOP_Muxed_RMS (void); + // Constructor. + + virtual ~TAO_IIOP_Muxed_RMS (void); + // Destructor. + + virtual int bind_handler (CORBA::ULong request_id, + TAO_Reply_Handlern *rh); + // Bind the handler with the request id. + + virtual TAO_Reply_Handler* find_handler (CORBA::ULong request_id); + // Find the Reply Handler. + + virtual TAO_InputCDR *get_cdr_stream (void); + // Create a new CDR stream and return. + + virtual void destroy_cdr_stream (TAO_InoutCDR *ccdr); + // Delete the cdr stream. +}; + +class TAO_Export TAO_IIOP_Exclusive_RMS : public TAO_IIOP_Request_Multiplexing_Strategy +{ + // = TITLE + // + // Connection exclusive for the request. + // + // = DESCRIPTION + // + +public: + TAO_IIOP_Exclusive_RMS (void); + // Constructor. + + virtual ~TAO_IIOP_Exclusive_RMS (void); + // Destructor. + + virtual int bind_handler (CORBA::ULong request_id, + TAO_Reply_Handlern *rh); + // Bind the handler with the request id. + + virtual TAO_Reply_Handler* find_handler (CORBA::ULong request_id); + // Find the Reply Handler. + + virtual TAO_InputCDR *get_cdr_stream (void); + // Return the preallocated CDR stream. + + virtual void destroy_cdr_stream (TAO_InputCDR *cdr); + // NOOP function. + +protected: + CORBA::ULong request_id_; + // Request id for the current request. + + TAO_Reply_Handler *rh_; + // Reply Handler corresponding to the request. + + TAO_InputCDR cdr_; + // @@ Preallocated CDR stream. +}; + +class TAO_Export TAO_IIOP_Wait_Strategy +{ + // = TITLE + // + // Strategy for waiting for the reply. + // + // = DESCRIPTION + // + +public: + TAO_IIOP_Wait_Strategy (void); + // Constructor. + + virtual ~TAO_IIOP_Wait_Strategy (void); + // Destructor. + + virtual TAO_GIOP_ReplyStatusType wait (CORBA::ULong request_id, + TAO_IIOP_Request_Multiplexing_Strategy *rms) = 0; + // Base class virtual method +}; + +class TAO_Export TAO_Wait_On_Reactor : public TAO_IIOP_Wait_Strategy +{ + // = TITLE + // + // Wait on the Reactor. Happens in s Single Threaded client + // environment. + // + // = DESCRIPTION + // + +public: + TAO_Wait_On_Reactor (void); + // Constructor. + + virtual ~TAO_Wait_On_Reactor (void); + // Destructor. + + virtual TAO_GIOP_ReplyStatusType wait (CORBA::ULong request_id, + TAO_IIOP_Request_Multiplexing_Strategy *rms); + // Do the event loop of the Reactor. +}; + +class TAO_Export TAO_Wait_On_Leader_Follower : public TAO_IIOP_Wait_Strategy +{ + // = TITLE + // + // Wait according to the Leader-Follower model. Leader does the + // event loop of the Reactor and the Followers wait on the + // condition variable. + // + // = DESCRIPTION + // + + TAO_Wait_On_Leader_Follower (void); + // Constructor. + + virtual ~TAO_Wait_On_Leader_Follower (void); + // Destructor. + + virtual TAO_GIOP_ReplyStatusType wait (CORBA::ULong request_id, + TAO_IIOP_Request_Multiplexing_Strategy *rms); + // Wait according to the L-F model. +}; + +class TAO_Export TAO_Wait_On_Read : public TAO_IIOP_Wait_Strategy +{ + // = TITLE + // + // Wait on receiving the reply. + // + // = DESCRIPTION + // + + TAO_Wait_On_Read (void); + // Constructor. + + virtual ~TAO_Wait_On_Read (void); + // Destructor. + + virtual TAO_GIOP_ReplyStatusType wait (CORBA::ULong request_id, + TAO_IIOP_Request_Multiplexing_Strategy *rms); + // Wait on the read operation. +}; #endif /* TAO_IIOP_TRANSPORT_H */ diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index 25753edba5a..8734c4d4644 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -312,6 +312,8 @@ TAO_GIOP_Invocation::write_request_header } +// @@ Does this comment make sense?. We dont wait for reply, right? +// (alex) // Send request, block until any reply comes back, and unmarshal reply // parameters as appropriate. @@ -636,8 +638,9 @@ TAO_GIOP_Twoway_Invocation::invoke (TAO_Exception_Data *excepts, int TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException)) + ACE_THROW_SPEC ((CORBA::SystemException)) { + // Just send the request, without trying to wait for the reply. int retval = TAO_GIOP_Invocation::invoke (1, ACE_TRY_ENV); ACE_CHECK_RETURN (retval); ACE_UNUSED_ARG (retval); @@ -677,7 +680,43 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) // according to POSIX, all C stack frames must also have their // (explicitly coded) handlers called. We assume a POSIX.1c/C/C++ // environment. + + // Get the reply status. + + TAO_GIOP_ReplyStatusType reply_status = + this->transport_->wait_for_reply (this->request_id_, + ACE_TRY_ENV); + ACE_CHECK_RETURN (reply_status); + + switch (reply_status) + { + case TAO_GIOP_NO_EXCEPTION: + // Return so that the STUB can demarshal the reply. + return TAO_INVOKE_OK; + // NOT REACHED. + + case TAO_GIOP_USER_EXCEPTION: + // Return so that the STUB can demarshal the user exception. + return TAO_INVOKE_EXCEPTION; + // NOTREACHED. + + case TAO_GIOP_SYSTEM_EXCEPTION: + { + // Demarshal the system exception and raise it! + return TAO_INVOKE_EXCEPTION; + } + // NOTREACHED. + case TAO_GIOP_LOCATION_FORWARD: + // Handle the forwarding and return so the stub restarts the + // request! + return this->location_forward (this->inp_stream_, ACE_TRY_ENV); + // NOT REACHED. + } + return 0; +} + +#if 0 // @@ Fred: if it makes sense to have a wrapper for send_request on // the TAO_Transport class then it should also make sense to have // one for recv_request(), right? @@ -688,12 +727,6 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) // suspend was called in TAO_Client_Connection_Handler::handle_input this->transport_->resume_connection (this->orb_core_->reactor ()); - switch (m) - { - case TAO_GIOP::Reply: - // The reply is handled at the end of this switch() statement. - break; - case TAO_GIOP::CloseConnection: // Try the same profile again, but open a new connection. // If that fails then we go to the next profile. @@ -890,6 +923,7 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) return TAO_INVOKE_EXCEPTION; } +#endif /* 0 */ // **************************************************************** diff --git a/TAO/tao/Invocation.i b/TAO/tao/Invocation.i index 93d9a9ca9d6..415ad873a06 100644 --- a/TAO/tao/Invocation.i +++ b/TAO/tao/Invocation.i @@ -77,7 +77,7 @@ TAO_GIOP_Oneway_Invocation::invoke (CORBA::Environment &ACE_TRY_ENV) return TAO_GIOP_Invocation::invoke (0, ACE_TRY_ENV); } -// **************************************************************** +// ********************************************************************* ACE_INLINE TAO_GIOP_Locate_Request_Invocation:: |