diff options
Diffstat (limited to 'TAO/tao/Connect.cpp')
-rw-r--r-- | TAO/tao/Connect.cpp | 770 |
1 files changed, 170 insertions, 600 deletions
diff --git a/TAO/tao/Connect.cpp b/TAO/tao/Connect.cpp index dd230de1c1a..2a069ec0963 100644 --- a/TAO/tao/Connect.cpp +++ b/TAO/tao/Connect.cpp @@ -7,9 +7,9 @@ #include "tao/GIOP.h" #include "tao/GIOP_Server_Request.h" #include "tao/ORB_Core.h" -#include "tao/ORB.h" #include "tao/POA.h" #include "tao/CDR.h" +#include "tao/Wait_Strategy.h" #if !defined (__ACE_INLINE__) # include "tao/Connect.i" @@ -81,31 +81,22 @@ TAO_IIOP_Handler_Base::resume_handler (ACE_Reactor *) return -1; } +// @@ For pluggable protocols, added a reference to the corresponding transport obj. TAO_Server_Connection_Handler::TAO_Server_Connection_Handler (ACE_Thread_Manager *t) : TAO_IIOP_Handler_Base (t ? t : TAO_ORB_Core_instance()->thr_mgr ()), - orb_core_ (TAO_ORB_Core_instance ()), - tss_resources_ (TAO_ORB_CORE_TSS_RESOURCES::instance ()) + orb_core_ (TAO_ORB_Core_instance ()) { iiop_transport_ = new TAO_IIOP_Server_Transport(this); } +// @@ For pluggable protocols, added a reference to the corresponding transport obj. TAO_Server_Connection_Handler::TAO_Server_Connection_Handler (TAO_ORB_Core *orb_core) : TAO_IIOP_Handler_Base (orb_core), - orb_core_ (orb_core), - tss_resources_ (TAO_ORB_CORE_TSS_RESOURCES::instance ()) + orb_core_ (orb_core) { iiop_transport_ = new TAO_IIOP_Server_Transport(this); } -TAO_Server_Connection_Handler::~TAO_Server_Connection_Handler (void) -{ - if (iiop_transport_) - { - delete iiop_transport_; - iiop_transport_ = 0; - } -} - TAO_Transport * TAO_Server_Connection_Handler::transport (void) { @@ -116,13 +107,6 @@ TAO_Server_Connection_Handler::transport (void) int TAO_Server_Connection_Handler::open (void*) { - // Called by the <Strategy_Acceptor> when the handler is completely - // connected. - ACE_INET_Addr addr; - - if (this->peer ().get_remote_addr (addr) == -1) - return -1; - #if !defined (ACE_LACKS_SOCKET_BUFSIZ) int sndbufsize = this->orb_core_->orb_params ()->sock_sndbuf_size (); @@ -157,15 +141,21 @@ TAO_Server_Connection_Handler::open (void*) // operation fails we are out of luck (some platforms do not support // it and return -1). - char client[MAXHOSTNAMELEN + 1]; + // Called by the <Strategy_Acceptor> when the handler is completely + // connected. + ACE_INET_Addr addr; - if (addr.get_host_name (client, MAXHOSTNAMELEN) == -1) - addr.addr_to_string (client, sizeof (client)); + if (this->peer ().get_remote_addr (addr) == -1) + return -1; - if (TAO_orbdebug) + char client[MAXHOSTNAMELEN + 16]; + + (void) addr.addr_to_string (client, sizeof (client)); + + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%P|%t) connection from client %s\n", - client)); + "(%P|%t) connection from client <%s> on %d\n", + client, this->peer ().get_handle ())); return 0; } @@ -221,8 +211,19 @@ TAO_Server_Connection_Handler::svc (void) // thread with this method as the "worker function". int result = 0; - // Inheriting the ORB_Core tss stuff from the parent thread. - this->orb_core_->inherit_from_parent_thread (this->tss_resources_); + // Inheriting the ORB_Core stuff from the parent thread. WARNING: + // this->orb_core_ is *not* the same as TAO_ORB_Core_instance(), + // this thread was just created and we are in fact *initializing* + // the ORB_Core based on the resources of the ORB that created + // us.... + + TAO_ORB_Core *tss_orb_core = TAO_ORB_Core_instance (); + tss_orb_core->inherit_from_parent_thread (this->orb_core_); + + // We need to change this->orb_core_ so it points to the TSS ORB + // Core, but we must preserve the old value + TAO_ORB_Core* old_orb_core = this->orb_core_; + this->orb_core_ = tss_orb_core; if (TAO_orbdebug) ACE_DEBUG ((LM_DEBUG, @@ -239,6 +240,8 @@ TAO_Server_Connection_Handler::svc (void) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_Server_Connection_Handler::svc end\n")); + this->orb_core_ = old_orb_core; + return result; } @@ -271,35 +274,34 @@ TAO_Server_Connection_Handler::handle_message (TAO_InputCDR &input, response_required = request.response_expected (); -#if !defined (TAO_NO_IOR_TABLE) - const CORBA::Octet *object_key = request.object_key ().get_buffer (); +#if !defined (TAO_NO_IOR_TABLE) if (ACE_OS::memcmp (object_key, - &TAO_POA::objectkey_prefix[0], - TAO_POA::TAO_OBJECTKEY_PREFIX_SIZE) != 0) + &TAO_POA::objectkey_prefix[0], + TAO_POA::TAO_OBJECTKEY_PREFIX_SIZE) != 0) { ACE_CString object_id (ACE_reinterpret_cast (const char *, object_key), - request.object_key ().length (), - 0, - 0); + TAO_POA::TAO_OBJECTKEY_PREFIX_SIZE, + 0, + 0); if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "Simple Object key %s. Doing the Table Lookup ...\n", - object_id.c_str ())); + ACE_DEBUG ((LM_DEBUG, + "Simple Object key %s. Doing the Table Lookup ...\n", + object_id.c_str ())); CORBA::Object_ptr object_reference; // Do the Table Lookup. int status = - this->orb_core_->orb ()->_tao_find_in_IOR_table (object_id, - object_reference); + this->orb_core_->orb ()->_tao_find_in_IOR_table (object_id, + object_reference); // If ObjectID not in table or reference is nil raise OBJECT_NOT_EXIST. if (CORBA::is_nil (object_reference) || status == -1) - ACE_THROW_RETURN (CORBA::OBJECT_NOT_EXIST (), -1); + ACE_THROW_RETURN (CORBA::OBJECT_NOT_EXIST (), -1); // ObjectID present in the table with an associated NON-NULL reference. // Throw a forward request exception. @@ -309,7 +311,7 @@ TAO_Server_Connection_Handler::handle_message (TAO_InputCDR &input, ACE_THROW_RETURN (PortableServer::ForwardRequest (dup), -1); } -#endif /* TAO_NO_IOR_TABLE */ +#endif // So, we read a request, now handle it using something more // primitive than a CORBA2 ServerRequest pseudo-object. @@ -372,6 +374,39 @@ TAO_Server_Connection_Handler::handle_locate (TAO_InputCDR &input, CORBA::Object_var forward_location_var; TAO_GIOP_LocateStatusType status; +// #if !defined (TAO_NO_IOR_TABLE) +// if (ACE_OS::memcmp (tmp_key.get_buffer (), +// &TAO_POA::objectkey_prefix[0], +// TAO_POA::TAO_OBJECTKEY_PREFIX_SIZE) == 0) +// { +// ACE_DEBUG ((LM_DEBUG, +// "TAO Object Key Prefix found in the object key.\n")); + + +// // Do the Table Lookup. Raise a location forward exception or +// // a non-exist exception. + +// // CORBA::Object_ptr object_reference; +// // int s = +// // table->lookup (request.object_key (), +// // object_reference); +// // if (s == -1) +// // { +// // status = TAO_GIOP_UNKNOWN_OBJECT; +// // } +// // else +// // { +// // status = TAO_GIOP_OBJECT_FORWARD; +// // forward_location_var = +// // CORBA::Object::_duplicate (object_reference); +// // } +// } +// // else +// // { +// #endif + + // this->handle_locate_i (....); + GIOP_ServerRequest serverRequest (locateRequestHeader.request_id, response_required, tmp_key, @@ -420,9 +455,6 @@ TAO_Server_Connection_Handler::handle_locate (TAO_InputCDR &input, } else { - -#if !defined (TAO_HAS_MINIMUM_CORBA) - // Try to narrow to ForwardRequest PortableServer::ForwardRequest_ptr forward_request_ptr = PortableServer::ForwardRequest::_narrow (env.exception ()); @@ -436,9 +468,6 @@ TAO_Server_Connection_Handler::handle_locate (TAO_InputCDR &input, "handle_locate has been called: forwarding\n")); } else - -#endif /* TAO_HAS_MINIMUM_CORBA */ - { // Normal exception, so the object is not here status = TAO_GIOP_UNKNOWN_OBJECT; @@ -454,6 +483,10 @@ TAO_Server_Connection_Handler::handle_locate (TAO_InputCDR &input, env.clear (); } +#if !defined (TAO_NO_IOR_TABLE) + // } +#endif + // Create the response. TAO_GIOP::start_message (TAO_GIOP::LocateReply, output, this->orb_core_); @@ -519,8 +552,6 @@ TAO_Server_Connection_Handler::send_error (CORBA::ULong request_id, // Write the request ID output.write_ulong (request_id); -#if !defined (TAO_HAS_MINIMUM_CORBA) - // @@ TODO This is the place to conditionally compile // forwarding. It certainly seems easy to strategize too, // just invoke an strategy to finish marshalling the @@ -547,14 +578,11 @@ TAO_Server_Connection_Handler::send_error (CORBA::ULong request_id, } // end of the forwarding code **************************** else - -#endif /* TAO_HAS_MINIMUM_CORBA */ - { // Write the exception CORBA::TypeCode_ptr except_tc = x->_type (); - CORBA::exception_type extype = CORBA::USER_EXCEPTION; + CORBA::ExceptionType extype = CORBA::USER_EXCEPTION; if (CORBA::SystemException::_narrow (x) != 0) extype = CORBA::SYSTEM_EXCEPTION; @@ -601,8 +629,7 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE) // allocator. It is better to use a message block than a on stack // buffer because we cannot minimize memory copies in that case. TAO_InputCDR input (this->orb_core_->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), - TAO_ENCAP_BYTE_ORDER, - this->orb_core_); + TAO_ENCAP_BYTE_ORDER); char repbuf[ACE_CDR::DEFAULT_BUFSIZE]; #if defined(ACE_HAS_PURIFY) @@ -611,20 +638,28 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE) TAO_OutputCDR output (repbuf, sizeof(repbuf), TAO_ENCAP_BYTE_ORDER, this->orb_core_->output_cdr_buffer_allocator (), - this->orb_core_->output_cdr_dblock_allocator (), - this->orb_core_->orb_params ()->cdr_memcpy_tradeoff ()); + this->orb_core_->output_cdr_buffer_allocator ()); int result = 0; int error_encountered = 0; CORBA::Boolean response_required = 0; CORBA::ULong request_id = 0; + TAO_GIOP_Version version; - CORBA::Environment &ACE_TRY_ENV = CORBA::default_environment (); - ACE_TRY + ACE_TRY_NEW_ENV { // Try to recv a new request. + + // Init the input message states in Transport. + this->iiop_transport_->message_size (0); + + // Recv message. Block for it. TAO_GIOP::Message_Type type = - TAO_GIOP::recv_request (this->iiop_transport_, input, this->orb_core_); + TAO_GIOP::recv_message (this->iiop_transport_, + input, + this->orb_core_, + version, + 1); TAO_MINIMAL_TIMEPROBE (TAO_SERVER_CONNECTION_HANDLER_RECEIVE_REQUEST_END); @@ -699,7 +734,7 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE) ACE_ERROR ((LM_ERROR, "(%P|%t) exception thrown " "but client is not waiting a response\n")); - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO: "); + ACE_TRY_ENV.print_exception (""); } // It is unfotunate that an exception (probably a system @@ -741,7 +776,7 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE) { // No exception but some kind of error, yet a response is // required. - if (TAO_orbdebug) + if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, "TAO: (%P|%t) %s: closing conn, no exception, " "but expecting response\n", @@ -754,7 +789,7 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE) { // No exception, no response expected, but an error ocurred, // close the socket. - if (TAO_orbdebug) + if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, "TAO: (%P|%t) %s: closing conn, no exception, " "but expecting response\n", @@ -768,24 +803,24 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE) return result; } +// **************************************************************** + // @@ For pluggable protocols, added a reference to the corresponding // transport obj. TAO_Client_Connection_Handler::TAO_Client_Connection_Handler (ACE_Thread_Manager *t) - : TAO_IIOP_Handler_Base (t == 0 ? TAO_ORB_Core_instance ()->thr_mgr () : t), - expecting_response_ (0), - input_available_ (0) + : TAO_IIOP_Handler_Base (t == 0 ? TAO_ORB_Core_instance ()->thr_mgr () : t) { - iiop_transport_ = new TAO_IIOP_Client_Transport(this); + // @@ Alex: Allocate this on-demand and use the orb_core to create + // the strategies. + TAO_ORB_Core *orb_core = TAO_ORB_Core_instance (); + iiop_transport_ = new TAO_IIOP_Client_Transport (this, + orb_core); } -// @@ Need to get rid of the Transport Objects! TAO_Client_Connection_Handler::~TAO_Client_Connection_Handler (void) { - if (iiop_transport_) - { - delete this->iiop_transport_; - this->iiop_transport_ = 0; - } + delete this->iiop_transport_; + this->iiop_transport_ = 0; } TAO_Transport * @@ -794,9 +829,19 @@ TAO_Client_Connection_Handler::transport (void) return this->iiop_transport_; } +// @@ Should I do something here to enable non-blocking?? (Alex). +// @@ Alex: I don't know if this is the place to do it, but the way to +// do it is: +// if (this->peer ().enable (ACE_NONBLOCK) == -1) +// return -1; +// Probably we will need to use the transport to decide if it is +// needed or not. + int TAO_Client_Connection_Handler::open (void *) { + // @@ TODO: This flags should be set using the RT CORBA policies... + // Here is where we could enable all sorts of things such as // nonblock I/O, sock buf sizes, TCP no-delay, etc. @@ -837,80 +882,48 @@ TAO_Client_Connection_Handler::open (void *) // operation fails we are out of luck (some platforms do not support // it and return -1). - // For now, we just return success - return 0; -} + // Called by the <Strategy_Acceptor> when the handler is completely + // connected. + ACE_INET_Addr addr; -int -TAO_Client_Connection_Handler::send_request (TAO_ORB_Core *, - TAO_OutputCDR &, - int) -{ - errno = ENOTSUP; - return -1; -} + if (this->peer ().get_remote_addr (addr) == -1) + return -1; -int -TAO_Client_Connection_Handler::handle_input (ACE_HANDLE) -{ - errno = ENOTSUP; - return -1; + char server[MAXHOSTNAMELEN + 16]; + + (void) addr.addr_to_string (server, sizeof (server)); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) connection to server <%s> on %d\n", + server, this->peer ().get_handle ())); + + // Register the handler with the Reactor if necessary. + return this->transport ()->wait_strategy ()->register_handler (); } int -TAO_Client_Connection_Handler::check_unexpected_data (void) +TAO_Client_Connection_Handler::handle_input (ACE_HANDLE) { - // We're a client, so we're not expecting to see input. Still we - // better check what it is! - char ignored; - ssize_t ret = this->peer().recv (&ignored, - sizeof ignored, - MSG_PEEK); - switch (ret) - { - case 0: - case -1: - // 0 is a graceful shutdown - // -1 is a somewhat ugly shutdown - // - // Both will result in us returning -1 and this connection - // getting closed - // - if (TAO_orbdebug) - ACE_DEBUG ((LM_WARNING, - "Client_Connection_Handler::handle_input: " - "closing connection on fd %d\n", - this->peer().get_handle ())); - break; - - case 1: - // - // @@ Fix me!! - // - // This should be the close connection message. Since we don't - // handle this yet, log an error, and close the connection. - ACE_ERROR ((LM_WARNING, - "Client_Connection_Handler::handle_input received " - "input while not expecting a response; " - "closing connection on fd %d\n", - this->peer().get_handle ())); - break; - } - - // We're not expecting input at this time, so we'll always - // return -1 for now. - return -1; + // Call the waiter to handle the input. + return this->transport ()->wait_strategy ()->handle_input (); } int TAO_Client_Connection_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask rm) { - if (TAO_orbdebug) + // @@ Alex: we need to figure out if the transport decides to close + // us or something else. If it is something else (for example + // the cached connector trying to make room for other + // connections) then we should let the transport know, so it can + // in turn take appropiate action (such as sending exceptions to + // all waiting reply handlers). + + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%P|%t) TAO_Client_Connection_Handler::handle_close (%d, %d)\n", - handle, - rm)); + "(%P|%t) TAO_Client_Connection_Handler::" + "handle_close (%d, %d)\n", handle, rm)); if (this->recycler ()) this->recycler ()->mark_as_closed (this->recycling_act ()); @@ -930,6 +943,8 @@ TAO_Client_Connection_Handler::handle_close (ACE_HANDLE handle, this->peer ().close (); + this->transport ()->handle_close (); + return 0; } @@ -943,471 +958,26 @@ TAO_Client_Connection_Handler::close (u_long) // **************************************************************** -TAO_RW_Client_Connection_Handler::TAO_RW_Client_Connection_Handler (ACE_Thread_Manager *t) - : TAO_Client_Connection_Handler (t) -{ -} - -TAO_RW_Client_Connection_Handler::~TAO_RW_Client_Connection_Handler (void) -{ -} - -int -TAO_RW_Client_Connection_Handler::send_request (TAO_ORB_Core* orb_core, - TAO_OutputCDR &stream, - int is_twoway) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_CLIENT_CONNECTION_HANDLER_SEND_REQUEST_START); - - // NOTE: Here would also be a fine place to calculate a digital - // signature for the message and place it into a preallocated slot - // in the "ServiceContext". Similarly, this is a good spot to - // encrypt messages (or just the message bodies) if that's needed in - // this particular environment and that isn't handled by the - // networking infrastructure (e.g. IPSEC). - // - // We could call a template method to do all this stuff, and if the - // connection handler were obtained from a factory, then this could - // be dynamically linked in (wouldn't that be cool/freaky?) - - // Send the request - int success = (int) TAO_GIOP::send_request (this->iiop_transport_, - stream, - orb_core); - TAO_MINIMAL_TIMEPROBE (GIOP_SEND_REQUEST_RETURN); - - if (!success) - return -1; - - return 0; -} - -int -TAO_RW_Client_Connection_Handler::resume_handler (ACE_Reactor *) -{ - // Since we don't suspend, we don't have to resume. - return 0; -} - -// **************************************************************** - -TAO_ST_Client_Connection_Handler::TAO_ST_Client_Connection_Handler (ACE_Thread_Manager *t) - : TAO_Client_Connection_Handler (t) -{ -} - -TAO_ST_Client_Connection_Handler::~TAO_ST_Client_Connection_Handler (void) -{ -} - -int -TAO_ST_Client_Connection_Handler::open (void *something) -{ - int result = TAO_Client_Connection_Handler::open (something); - - if (result != 0) - return result; - - // Now we must register ourselves with the reactor for input events - // which will detect GIOP Reply messages and EOF conditions. - ACE_Reactor *r = TAO_ORB_Core_instance ()->reactor (); - return r->register_handler (this, - ACE_Event_Handler::READ_MASK); -} - -// @@ this seems odd that the connection handler would call methods in the -// GIOP object. Some of this mothod's functionality should be moved -// to GIOP. fredk -int -TAO_ST_Client_Connection_Handler::send_request (TAO_ORB_Core* orb_core, - TAO_OutputCDR &stream, - int is_twoway) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_CLIENT_CONNECTION_HANDLER_SEND_REQUEST_START); - - // NOTE: Here would also be a fine place to calculate a digital - // signature for the message and place it into a preallocated slot - // in the "ServiceContext". Similarly, this is a good spot to - // encrypt messages (or just the message bodies) if that's needed in - // this particular environment and that isn't handled by the - // networking infrastructure (e.g. IPSEC). - // - // We could call a template method to do all this stuff, and if the - // connection handler were obtained from a factory, then this could - // be dynamically linked in (wouldn't that be cool/freaky?) - - // Send the request - int success = (int) TAO_GIOP::send_request (this->iiop_transport_, - stream, - orb_core); - TAO_MINIMAL_TIMEPROBE (GIOP_SEND_REQUEST_RETURN); - - if (!success) - return -1; - - if (is_twoway) - { - // Set the state so that we know we're looking for a response. - this->expecting_response_ = 1; - - // Go into a loop, waiting until it's safe to try to read - // something on the socket. The handle_input() method doesn't - // actualy do the read, though, proper behavior based on what is - // read may be different if we're not using GIOP above here. - // So, we leave the reading of the response to the caller of - // this method, and simply insure that this method doesn't - // return until such time as doing a recv() on the socket would - // actually produce fruit. - ACE_Reactor *r = orb_core->reactor (); - - int ret = 0; - - while (ret != -1 && ! this->input_available_) - ret = r->handle_events (); - - this->input_available_ = 0; - // We can get events now, b/c we want them! - - // We're no longer expecting a response! - this->expecting_response_ = 0; - } - - return 0; -} - -int -TAO_ST_Client_Connection_Handler::handle_input (ACE_HANDLE) -{ - int retval = 0; - - if (this->expecting_response_) - { - this->input_available_ = 1; - // Temporarily remove ourself from notification so that if - // another sub event loop is in effect still waiting for its - // response, it doesn't spin tightly gobbling up CPU. - TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this); - } - else - retval = this->check_unexpected_data (); - - return retval; -} - -int -TAO_ST_Client_Connection_Handler::resume_handler (ACE_Reactor *reactor) -{ - return reactor->resume_handler (this); -} - -// **************************************************************** - -TAO_MT_Client_Connection_Handler::TAO_MT_Client_Connection_Handler (ACE_Thread_Manager *t) - : TAO_Client_Connection_Handler (t), - calling_thread_ (ACE_OS::NULL_thread), - cond_response_available_ (0), - orb_core_ (0) -{ -} - -TAO_MT_Client_Connection_Handler::~TAO_MT_Client_Connection_Handler (void) -{ - delete this->cond_response_available_; -} - -int -TAO_MT_Client_Connection_Handler::open (void *something) -{ - int result = TAO_Client_Connection_Handler::open (something); - - if (result != 0) - return result; - - // Now we must register ourselves with the reactor for input events - // which will detect GIOP Reply messages and EOF conditions. - ACE_Reactor *r = TAO_ORB_Core_instance ()->reactor (); - return r->register_handler (this, - ACE_Event_Handler::READ_MASK); -} - -ACE_SYNCH_CONDITION* -TAO_MT_Client_Connection_Handler::cond_response_available (TAO_ORB_Core* orb_core) -{ - // @@ TODO This condition variable should per-ORB-per-thread, not - // per-connection, it is a waste to have more than one of this in - // the same thread. - if (this->cond_response_available_ == 0) - { - ACE_NEW_RETURN (this->cond_response_available_, - ACE_SYNCH_CONDITION (orb_core->leader_follower_lock ()), - 0); - } - return this->cond_response_available_; -} - -int -TAO_MT_Client_Connection_Handler::send_request (TAO_ORB_Core *orb_core, - TAO_OutputCDR &stream, - int is_twoway) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_CLIENT_CONNECTION_HANDLER_SEND_REQUEST_START); - - // Save the ORB_Core for the handle_input callback... - this->orb_core_ = orb_core; - - // NOTE: Here would also be a fine place to calculate a digital - // signature for the message and place it into a preallocated slot - // in the "ServiceContext". Similarly, this is a good spot to - // encrypt messages (or just the message bodies) if that's needed in - // this particular environment and that isn't handled by the - // networking infrastructure (e.g. IPSEC). - // - // We could call a template method to do all this stuff, and if the - // connection handler were obtained from a factory, then this could - // be dynamically linked in (wouldn't that be cool/freaky?) - - if (!is_twoway) - { - // Send the request - int success = (int) TAO_GIOP::send_request (this->iiop_transport_, - stream, - this->orb_core_); - - TAO_MINIMAL_TIMEPROBE (GIOP_SEND_REQUEST_RETURN); - - if (!success) - return -1; - } - else // is_twoway - { - ACE_Reactor *r = this->orb_core_->reactor (); - - if (this->reactor () != r) - { - ACE_Reactor_Mask mask = - ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; - this->reactor ()->remove_handler (this, mask); - - r->register_handler (this, - ACE_Event_Handler::READ_MASK); - } - - if (this->orb_core_->leader_follower_lock ().acquire() == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to get the lock.\n"), - -1); - - // Set the state so that we know we're looking for a response. - this->expecting_response_ = 1; - // remember in which thread the client connection handler was running - this->calling_thread_ = ACE_Thread::self (); - - // Send the request - int success = (int) TAO_GIOP::send_request (this->iiop_transport_, - stream, - orb_core); - - TAO_MINIMAL_TIMEPROBE (GIOP_SEND_REQUEST_RETURN); - - if (!success) - { - this->orb_core_->leader_follower_lock ().release (); - return -1; - } - - // check if there is a leader, but the leader is not us - if (this->orb_core_->leader_available () && - !this->orb_core_->I_am_the_leader_thread ()) - { - // wait as long as no input is available and/or - // no leader is available - while (!this->input_available_ && - this->orb_core_->leader_available ()) - { - ACE_SYNCH_CONDITION* cond = - this->cond_response_available (orb_core); - if (this->orb_core_->add_follower (cond) == -1) - ACE_ERROR ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to add a follower thread\n")); - cond->wait (); - } - // now somebody woke us up to become a leader or to handle - // our input. We are already removed from the follower queue - if (this->input_available_) - { - // there is input waiting for me - if (this->orb_core_->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to release the lock.\n"), - -1); - // The following variables are safe, because we are not - // registered with the reactor any more. - this->input_available_ = 0; - this->expecting_response_ = 0; - this->calling_thread_ = ACE_OS::NULL_thread; - return 0; - } - } - - // Become a leader, because there is no leader or we have to - // update to a leader or we are doing nested upcalls in this - // case we do increase the refcount on the leader in - // TAO_ORB_Core. - - this->orb_core_->set_leader_thread (); - // this might increase the recount of the leader - - if (this->orb_core_->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to release the lock.\n"), - -1); - - r->owner (ACE_Thread::self ()); - - int ret = 0; - - while (ret != -1 && !this->input_available_) - ret = r->handle_events (); - - if (ret == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "handle_events failed.\n"), - -1); - - // Wake up the next leader, we cannot do that in handle_input, - // because the woken up thread would try to get into - // handle_events, which is at the time in handle_input still - // occupied. - - if (this->orb_core_->unset_leader_wake_up_follower () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::send_request: " - "Failed to unset the leader and wake up a new follower.\n"), - -1); - // Make use reusable - this->input_available_ = 0; - this->expecting_response_ = 0; - this->calling_thread_ = ACE_OS::NULL_thread; - } - - return 0; -} - -int -TAO_MT_Client_Connection_Handler::handle_input (ACE_HANDLE) -{ - if (this->orb_core_ == 0) - this->orb_core_ = TAO_ORB_Core_instance (); - - if (this->orb_core_->leader_follower_lock ().acquire () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to get the lock.\n"), - -1); - - if (!this->expecting_response_) - { - // we got something, but did not want - // @@ wake up an other thread, we are lost - - if (this->orb_core_->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to release the lock.\n"), - -1); - return this->check_unexpected_data (); - } - - if (ACE_OS::thr_equal (this->calling_thread_, - ACE_Thread::self ())) - { - // We are now a leader getting its response. - this->input_available_ = 1; - - if (this->orb_core_->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to release the lock.\n"), - -1); - this->orb_core_->reactor ()->suspend_handler (this); - // resume_handler is called in TAO_GIOP_Invocation::invoke - return 0; - } - else - { - // We are a leader, which got a response for one of the - // followers, which means we are now a thread running the wrong - // Client_Connection_Handler - - // At this point we might fail to remove the follower, because - // it has been already chosen to become the leader, so it is - // awake and will get this too. - ACE_SYNCH_CONDITION* cond = - this->cond_response_available (this->orb_core_); - - this->orb_core_->remove_follower (cond); - - if (this->orb_core_->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to release the lock.\n"), - -1); - - this->orb_core_->reactor ()->suspend_handler (this); - // We should wake suspend the thread before we wake him up. - // resume_handler is called in TAO_GIOP_Invocation::invoke - - // @@ TODO (Michael): We might be able to optimize this in - // doing the suspend_handler as last thing, but I am not sure - // if a race condition would occur. - - if (this->orb_core_->leader_follower_lock ().acquire () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to acquire the lock.\n"), - -1); - // The thread was already selected to become a leader, so we - // will be called again. - this->input_available_ = 1; - cond->signal (); - - if (this->orb_core_->leader_follower_lock ().release () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) TAO_Client_Connection_Handler::handle_input: " - "Failed to release the lock.\n"), - -1); - return 0; - } -} - -int -TAO_MT_Client_Connection_Handler::resume_handler (ACE_Reactor *reactor) -{ - return reactor->resume_handler (this); -} - -// **************************************************************** - -// @@ Fred&Ossama: Could somebody please check up the location of -// these template instantiations? For example the Hash_Map from -// ACE_INET_Addr to TAO_Object_Adapter does not seems to belong in -// this file, maybe in ORB.cpp??? - +#define TAO_SVC_TUPLE ACE_Svc_Tuple<TAO_Client_Connection_Handler> +#define CACHED_CONNECT_STRATEGY ACE_Cached_Connect_Strategy<TAO_Client_Connection_Handler, TAO_SOCK_CONNECTOR, TAO_Cached_Connector_Lock> +#define REFCOUNTED_HASH_RECYCLABLE_ADDR ACE_Refcounted_Hash_Recyclable<ACE_INET_Addr> #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) - - -template class ACE_Unbounded_Set<ACE_INET_Addr>; -template class ACE_Unbounded_Set_Iterator<ACE_INET_Addr>; - +template class ACE_Svc_Handler<TAO_SOCK_STREAM, ACE_NULL_SYNCH>; +template class REFCOUNTED_HASH_RECYCLABLE_ADDR; +template class TAO_SVC_TUPLE; +template class ACE_Map_Manager<int, TAO_SVC_TUPLE*, ACE_SYNCH_RW_MUTEX>; +template class ACE_Map_Iterator_Base<int, TAO_SVC_TUPLE*, ACE_SYNCH_RW_MUTEX>; +template class ACE_Map_Iterator<int, TAO_SVC_TUPLE*, ACE_SYNCH_RW_MUTEX>; +template class ACE_Map_Reverse_Iterator<int, TAO_SVC_TUPLE*, ACE_SYNCH_RW_MUTEX>; +template class ACE_Map_Entry<int, TAO_SVC_TUPLE*>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) - -#pragma instantiate ACE_Unbounded_Set<ACE_INET_Addr> -#pragma instantiate ACE_Unbounded_Set_Iterator<ACE_INET_Addr> - +#pragma instantiate ACE_Svc_Handler<TAO_SOCK_STREAM, ACE_NULL_SYNCH> +#pragma instantiate REFCOUNTED_HASH_RECYCLABLE_ADDR +#pragma instantiate TAO_SVC_TUPLE +#pragma instantiate ACE_Map_Manager<int, TAO_SVC_TUPLE*, ACE_SYNCH_RW_MUTEX> +#pragma instantiate ACE_Map_Iterator_Base<int, TAO_SVC_TUPLE*, ACE_SYNCH_RW_MUTEX> +#pragma instantiate ACE_Map_Iterator<int, TAO_SVC_TUPLE*, ACE_SYNCH_RW_MUTEX> +#pragma instantiate ACE_Map_Reverse_Iterator<int, TAO_SVC_TUPLE*, ACE_SYNCH_RW_MUTEX> +#pragma instantiate ACE_Map_Entry<int, TAO_SVC_TUPLE*> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |