diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 417 |
1 files changed, 78 insertions, 339 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index e07169bb109..6cfccf24bab 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -20,7 +20,6 @@ #include "Thread_Lane_Resources.h" #include "debug.h" #include "Resume_Handle.h" -#include "Notify_Handler.h" #include "ace/Message_Block.h" #include "ace/Reactor.h" @@ -96,16 +95,9 @@ dump_iov (iovec *iov, int iovcnt, size_t id, ACE_Log_Msg::instance ()->release (); } -/* - * Definitions for methods declared in the transport class - * - */ - -// Constructor. TAO_Transport::TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core) - : TAO_Synch_Refcountable (orb_core->resource_factory ()->create_cached_connection_lock (), 1) - , tag_ (tag) + : tag_ (tag) , orb_core_ (orb_core) , cache_map_entry_ (0) , bidirectional_flag_ (-1) @@ -135,8 +127,6 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, TAO_Transport::~TAO_Transport (void) { - ACE_ASSERT(this->refcount() == 0); - delete this->ws_; delete this->tms_; @@ -145,122 +135,17 @@ TAO_Transport::~TAO_Transport (void) // By the time the destructor is reached all the connection stuff // *must* have been cleaned up - ACE_ASSERT(this->head_ == 0); - ACE_ASSERT(this->cache_map_entry_ == 0); -} - - -/* - * - * Public utility methods that are called by other classes. - * - */ -/*static*/ TAO_Transport* -TAO_Transport::_duplicate (TAO_Transport* transport) -{ - if (transport != 0) - { - transport->increment (); - } - return transport; -} - -/*static*/ void -TAO_Transport::release (TAO_Transport* transport) -{ - if (transport != 0) - { - int count = transport->decrement (); - - if (count == 0) - { - delete transport; - } - else if (count < 0) - { - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Transport[%d]::release, " - "reference count is less than zero: %d\n", - transport->id (), count)); - ACE_OS::abort (); - } - } + ACE_ASSERT (this->head_ == 0); + ACE_ASSERT (this->cache_map_entry_ == 0); } - void -TAO_Transport::provide_handle (ACE_Handle_Set &reactor_registered, - TAO_EventHandlerSet &unregistered) -{ - ACE_MT (ACE_GUARD (ACE_Lock, - guard, - *this->handler_lock_)); - ACE_Event_Handler *eh = this->event_handler_i (); - - if (eh != 0) - { - if (this->ws_->is_registered ()) - { - reactor_registered.set_bit (eh->get_handle ()); - } - else - { - unregistered.insert (eh); - } - } -} - - -int -TAO_Transport::register_handler (void) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->handler_lock_, - -1)); - if (this->check_event_handler_i ("Transport::register_handler") == -1) - return -1; - - return this->register_handler_i (); -} - - -ssize_t -TAO_Transport::send (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *timeout) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->handler_lock_, - -1)); - - if (this->check_event_handler_i ("Transport::send") == -1) - return -1; - - // now call the template method - return this->send_i (iov, iovcnt, bytes_transferred, timeout); -} - - -ssize_t -TAO_Transport::recv (char *buffer, - size_t len, - const ACE_Time_Value *timeout) +TAO_Transport::provide_handler (TAO_Connection_Handler_Set &handlers) { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->handler_lock_, - -1)); - - if (this->check_event_handler_i ("Transport::recv") == -1) - return -1; - - // now call the template method - return this->recv_i (buffer, len, timeout); + this->add_reference (); + handlers.insert (this->connection_handler_i ()); } - int TAO_Transport::idle_after_send (void) { @@ -282,8 +167,32 @@ TAO_Transport::tear_listen_point_list (TAO_InputCDR &) void TAO_Transport::close_connection (void) { - TAO_Connection_Handler * eh = this->invalidate_event_handler (); - this->close_connection_shared (1, eh); + this->connection_handler_i ()->close_connection (); +} + +int +TAO_Transport::register_handler (void) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::register_handler\n", + this->id ())); + } + + ACE_Reactor *r = this->orb_core_->reactor (); + + if (r == this->event_handler_i ()->reactor ()) + return 0; + + // Set the flag in the Connection Handler and in the Wait Strategy + // @@Maybe we should set these flags after registering with the + // reactor. What if the registration fails??? + this->ws_->is_registered (1); + + // Register the handler with the reactor + return r->register_handler (this->event_handler_i (), + ACE_Event_Handler::READ_MASK); } int @@ -308,7 +217,6 @@ TAO_Transport::generate_locate_request ( return 0; } - int TAO_Transport::generate_request_header ( TAO_Operation_Details &opdetails, @@ -318,8 +226,8 @@ TAO_Transport::generate_request_header ( // codeset service context is only supposed to be sent in the first request // on a particular connection. if (this->first_request_) - this->orb_core()->codeset_manager()-> - generate_service_context( opdetails, *this ); + this->orb_core ()->codeset_manager ()-> + generate_service_context (opdetails, *this); if (this->messaging_object ()->generate_request_header (opdetails, spec, @@ -337,32 +245,6 @@ TAO_Transport::generate_request_header ( return 0; } - -/* - * NOTE: Some of these calls looks like ideal fodder for - * inlining. But, please note that the calls made within the method - * are not inlined. This would increase closure cost on the - * compiler. - */ -void -TAO_Transport::connection_handler_closing (void) -{ - // The connection has closed, we must invalidate the handler to - // ensure that any attempt to use this transport results in an - // error. Basically all the other methods in the Transport - // cooperate via check_event_handler_i() - - TAO_Connection_Handler * eh = this->invalidate_event_handler (); - this->send_connection_closed_notifications (); - - if (eh != 0) - { - // REFCNT: Matches incr_refcnt in XXX_Transport::XXX_Transport - // REFCNT: Only one of this or close_connection_shared() run - eh->decr_refcount(); - } -} - // @@TODO: Ideally the following should be inline. int TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc) @@ -378,7 +260,7 @@ TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc) void TAO_Transport::purge_entry (void) { - (void) this->transport_cache_manager ().purge_entry (this->cache_map_entry_); + this->transport_cache_manager ().purge_entry (this->cache_map_entry_); } int @@ -393,7 +275,6 @@ TAO_Transport::update_transport (void) return this->transport_cache_manager ().update_entry (this->cache_map_entry_); } - /* * * Methods called and used in the output path of the ORB. @@ -427,7 +308,6 @@ TAO_Transport::handle_output (void) return retval; } - int TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb, size_t &bytes_transferred, @@ -435,9 +315,6 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb, { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - if (this->check_event_handler_i ("Transport::send_message_block_chain") == -1) - return -1; - return this->send_message_block_chain_i (mb, bytes_transferred, max_wait_time); @@ -490,22 +367,22 @@ TAO_Transport::send_message_shared (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { - int r; + int result; + { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - if (this->check_event_handler_i ("Transport::send_message_shared") == -1) - return -1; - - r = this->send_message_shared_i (stub, message_semantics, - message_block, max_wait_time); - } - if (r == -1) - { - this->close_connection (); + result = + this->send_message_shared_i (stub, message_semantics, + message_block, max_wait_time); } - return r; + if (result == -1) + { + this->close_connection (); + } + + return result; } int @@ -686,74 +563,6 @@ TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_mess return 0; } - -void -TAO_Transport::close_connection_i (void) -{ - TAO_Connection_Handler * eh = this->invalidate_event_handler_i (); - this->close_connection_shared (1, eh); -} - -void -TAO_Transport::close_connection_no_purge (void) -{ - TAO_Connection_Handler * eh = this->invalidate_event_handler (); - - this->close_connection_shared (0, - eh); -} - -void -TAO_Transport::close_connection_shared (int purge, - TAO_Connection_Handler * eh) -{ - // Purge the entry - if (purge) - { - this->transport_cache_manager ().purge_entry (this->cache_map_entry_); - } - - if (eh == 0) - { - // The connection was already closed - return; - } - - // Set the event handler in the connection close wait state. - (void) eh->connection_close_wait (); - - // NOTE: If the wait strategy is in blocking mode, then there is no - // chance that it could be inside the reactor. We can safely skip - // driving the LF. If <purge> is 0, then we are cleaned up by the - // cache. So no point in driving the LF either. - if (this->ws_->non_blocking () && purge) - { - // NOTE: This is a work around for BUG 1020. We drive the leader - // follower for a predetermined amount of time. Ideally this - // needs to be an ORB option. But this is just the first - // cut. Doing that will be a todo.. - - ACE_Time_Value tv (ACE_DEFAULT_TIMEOUT, 0); - this->orb_core_->leader_follower ().wait_for_event (eh, - this, - &tv); - - } - - // We need to explicitly shut it down to avoid memory leaks. - if (!eh->successful () || - !this->ws_->non_blocking ()) - { - eh->close_connection (); - } - - this->send_connection_closed_notifications (); - - // REFCNT: Matches incr_refcnt in XXX_Transport::XXX_Transport - // REFCNT: Only one of this or connection_handler_closing() run - eh->decr_refcount (); -} - int TAO_Transport::queue_is_empty_i (void) { @@ -765,12 +574,7 @@ int TAO_Transport::schedule_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - ACE_Reactor *reactor = eh->reactor (); - if (reactor == 0) - return -1; if (TAO_debug_level > 3) { @@ -786,12 +590,7 @@ int TAO_Transport::cancel_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - ACE_Reactor *reactor = eh->reactor (); - if (reactor == 0) - return -1; if (TAO_debug_level > 3) { @@ -857,14 +656,11 @@ TAO_Transport::drain_queue (void) int TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) { - if (this->check_event_handler_i ("Transport::drain_queue_helper") == -1) - return -1; - size_t byte_count = 0; // ... send the message ... ssize_t retval = - this->send_i (iov, iovcnt, byte_count); + this->send (iov, iovcnt, byte_count); if (TAO_debug_level == 5) { @@ -896,7 +692,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " "error during %p\n", - this->id (), "send_i()")); + this->id (), "send()")); } if (errno == EWOULDBLOCK) return 0; @@ -981,14 +777,8 @@ TAO_Transport::drain_queue_i (void) if (this->flush_timer_pending ()) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh != 0) - { - ACE_Reactor *reactor = eh->reactor (); - if (reactor != 0) - { - (void) reactor->cancel_timer (this->flush_timer_id_); - } - } + ACE_Reactor *reactor = eh->reactor (); + reactor->cancel_timer (this->flush_timer_id_); this->reset_flush_timer (); } return 1; @@ -1063,25 +853,19 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, if (set_timer) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh != 0) - { - ACE_Reactor *reactor = eh->reactor (); - if (reactor != 0) - { - this->current_deadline_ = new_deadline; - ACE_Time_Value delay = - new_deadline - ACE_OS::gettimeofday (); + ACE_Reactor *reactor = eh->reactor (); + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); - if (this->flush_timer_pending ()) - { - (void) reactor->cancel_timer (this->flush_timer_id_); - } - this->flush_timer_id_ = - reactor->schedule_timer (&this->transport_timer_, - &this->current_deadline_, - delay); - } + if (this->flush_timer_pending ()) + { + reactor->cancel_timer (this->flush_timer_id_); } + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); } return constraints_reached; @@ -1099,14 +883,6 @@ TAO_Transport::report_invalid_event_handler (const char *caller) } } -TAO_Connection_Handler * -TAO_Transport::invalidate_event_handler (void) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, 0)); - - return this->invalidate_event_handler_i (); -} - void TAO_Transport::send_connection_closed_notifications (void) { @@ -1152,7 +928,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub, max_wait_time); } - // Let's figure out if the message should be queued without trying // to send first: int try_sending_first = 1; @@ -1287,8 +1062,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub, return 0; } - - /* * * All the methods relevant to the incoming data path of the ORB are @@ -1296,9 +1069,9 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub, * */ int -TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, - ACE_Time_Value * max_wait_time, - int /*block*/) +TAO_Transport::handle_input (TAO_Resume_Handle &rh, + ACE_Time_Value * max_wait_time, + int /*block*/) { if (TAO_debug_level > 3) { @@ -1319,9 +1092,8 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, "TAO (%P|%t) - Transport[%d]::handle_input_i, " "error while parsing the head of the queue\n", this->id())); - - this->send_connection_closed_notifications (); } + return retval; } @@ -1376,14 +1148,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, // If there is an error return to the reactor.. if (n <= 0) - { - if (n == -1) - { - this->send_connection_closed_notifications (); - } - - return n; - } + return n; if (TAO_debug_level > 2) { @@ -1492,7 +1257,6 @@ TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) "error in incoming message\n", this->id ())); - this->send_connection_closed_notifications (); return -1; } } @@ -1584,7 +1348,7 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, "error while trying to consolidate\n", this->id ())); } - this->send_connection_closed_notifications (); + return -1; } @@ -1936,8 +1700,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, "received CloseConnection message %p\n", this->id(), "")); - this->send_connection_closed_notifications (); - // Return a "-1" so that the next stage can take care of // closing connection and the necessary memory management. return -1; @@ -1945,17 +1707,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST || t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST) { - // Ready to process a request. Increment the refcount of <this - // transport>. Theoretically, after handler resumption another - // thread can access this very same transport and can even close - // this. To have a valid Transport object for further processing - // we should increment the refcount. Please see Bug 1382 for - // more details. - // REFCNT: Matched by the release before returning. - - // This generic class takes care of everything. - TAO_Transport_Refcount_Guard rg (this); - // Let us resume the handle before we go ahead to process the // request. This will open up the handle for other threads. rh.resume_handle (); @@ -1964,9 +1715,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, this, qd) == -1) { - this->send_connection_closed_notifications (); - - // Return a "-1" so that the next stage can take care of // closing connection and the necessary memory management. return -1; @@ -1975,8 +1723,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, else if (t == TAO_PLUGGABLE_MESSAGE_REPLY || t == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) { - // Please see ..else if (XXX_REQUEST) for whys and whats.. - TAO_Transport_Refcount_Guard rg (this); rh.resume_handle (); // @@todo: Maybe the input_cdr can be constructed from the @@ -1993,7 +1739,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, "error in process_reply_message %p\n", this->id (), "")); - this->send_connection_closed_notifications (); return -1; } @@ -2009,7 +1754,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, "dispatch reply failed\n", this->id ())); - this->send_connection_closed_notifications (); return -1; } @@ -2160,25 +1904,10 @@ TAO_Transport::notify_reactor (void) ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - // Get the reactor associated with the event handler ACE_Reactor *reactor = this->orb_core ()->reactor (); - if (reactor == 0) - return -1; - - // NOTE: Instead of creating the handler seperately, it would be - // awesome if we could create the handler when we create the - // TAO_Queued_Data. That would save us an allocation. - TAO_Notify_Handler *nh = - TAO_Notify_Handler::create_handler ( - this, - eh->get_handle (), - this->orb_core ()->transport_message_buffer_allocator ()); - if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, @@ -2189,7 +1918,7 @@ TAO_Transport::notify_reactor (void) // Send a notification to the reactor... - int retval = reactor->notify (nh, + int retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK); if (retval < 0 && TAO_debug_level > 2) @@ -2226,7 +1955,17 @@ TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp) } } +ACE_Event_Handler::Reference_Count +TAO_Transport::add_reference (void) +{ + return this->event_handler_i ()->add_reference (); +} +ACE_Event_Handler::Reference_Count +TAO_Transport::remove_reference (void) +{ + return this->event_handler_i ()->remove_reference (); +} #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) |