diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 2569 |
1 files changed, 2569 insertions, 0 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp new file mode 100644 index 00000000000..ce44d35eb47 --- /dev/null +++ b/TAO/tao/Transport.cpp @@ -0,0 +1,2569 @@ +// $Id$ + +#include "tao/Transport.h" + +#include "tao/LF_Follower.h" +#include "tao/Leader_Follower.h" +#include "tao/Client_Strategy_Factory.h" +#include "tao/Wait_Strategy.h" +#include "tao/Transport_Mux_Strategy.h" +#include "tao/Stub.h" +#include "tao/Transport_Queueing_Strategies.h" +#include "tao/Connection_Handler.h" +#include "tao/Pluggable_Messaging.h" +#include "tao/Synch_Queued_Message.h" +#include "tao/Asynch_Queued_Message.h" +#include "tao/Flushing_Strategy.h" +#include "tao/Thread_Lane_Resources.h" +#include "tao/Resume_Handle.h" +#include "tao/Codeset_Manager.h" +#include "tao/Codeset_Translator_Base.h" +#include "tao/debug.h" +#include "tao/CDR.h" +#include "tao/ORB_Core.h" +#include "tao/MMAP_Allocator.h" +#include "tao/SystemException.h" + +#include "ace/OS_NS_sys_time.h" +#include "ace/OS_NS_stdio.h" +#include "ace/Reactor.h" +#include "ace/os_include/sys/os_uio.h" +#include "ace/High_Res_Timer.h" +#include "ace/CORBA_macros.h" + +/* + * Specialization hook to add include files from + * concrete transport implementation. + */ +//@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK + +#if !defined (__ACE_INLINE__) +# include "tao/Transport.inl" +#endif /* __ACE_INLINE__ */ + + +ACE_RCSID (tao, + Transport, + "$Id$") + +/* + * Static function in file scope + */ +static void +dump_iov (iovec *iov, int iovcnt, size_t id, + size_t current_transfer, + const char *location) +{ + ACE_Guard <ACE_Log_Msg> log_guard (*ACE_Log_Msg::instance ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ") + ACE_TEXT ("sending %d buffers\n"), + id, ACE_TEXT_CHAR_TO_TCHAR (location), iovcnt)); + + for (int i = 0; i != iovcnt && 0 < current_transfer; ++i) + { + size_t iov_len = iov[i].iov_len; + + // Possibly a partially sent iovec entry. + if (current_transfer < iov_len) + { + iov_len = current_transfer; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ") + ACE_TEXT ("buffer %d/%d has %d bytes\n"), + id, ACE_TEXT_CHAR_TO_TCHAR(location), + i, iovcnt, + iov_len)); + + size_t len; + + for (size_t offset = 0; offset < iov_len; offset += len) + { + ACE_TCHAR header[1024]; + ACE_OS::sprintf (header, + ACE_TEXT("TAO - ") + ACE_TEXT("Transport[") + ACE_SIZE_T_FORMAT_SPECIFIER + ACE_TEXT("]::%s") + ACE_TEXT(" (") + ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT("/") + ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT(")"), + id, location, offset, iov_len); + + len = iov_len - offset; + + if (len > 512) + { + len = 512; + } + + ACE_HEX_DUMP ((LM_DEBUG, + static_cast<char*> (iov[i].iov_base) + offset, + len, + header)); + } + current_transfer -= iov_len; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ") + ACE_TEXT ("end of data\n"), + id, ACE_TEXT_CHAR_TO_TCHAR(location))); +} + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +#if TAO_HAS_TRANSPORT_CURRENT == 1 +TAO::Transport::Stats::~Stats () +{ + // no-op +} +#endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ + +TAO_Transport::TAO_Transport (CORBA::ULong tag, + TAO_ORB_Core *orb_core) + : tag_ (tag) + , orb_core_ (orb_core) + , cache_map_entry_ (0) + , bidirectional_flag_ (-1) + , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE) + , head_ (0) + , tail_ (0) + , incoming_message_queue_ (orb_core) + , current_deadline_ (ACE_Time_Value::zero) + , flush_timer_id_ (-1) + , transport_timer_ (this) + , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) + , id_ ((size_t) this) + , purging_order_ (0) + , recv_buffer_size_ (0) + , sent_byte_count_ (0) + , is_connected_ (false) + , char_translator_ (0) + , wchar_translator_ (0) + , tcs_set_ (0) + , first_request_ (1) + , partial_message_ (0) +#if TAO_HAS_SENDFILE == 1 + // The ORB has been configured to use the MMAP allocator, meaning + // we could/should use sendfile() to send data. Cast once rather + // here rather than during each send. This assumes that all + // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator + // instance as the underlying output CDR buffer allocator. + , mmap_allocator_ ( + dynamic_cast<TAO_MMAP_Allocator *> ( + orb_core->output_cdr_buffer_allocator ())) +#endif /* TAO_HAS_SENDFILE==1 */ +{ + TAO_Client_Strategy_Factory *cf = + this->orb_core_->client_factory (); + + // Create WS now. + this->ws_ = cf->create_wait_strategy (this); + + // Create TMS now. + this->tms_ = cf->create_transport_mux_strategy (this); + +#if TAO_HAS_TRANSPORT_CURRENT == 1 + // Allocate stats + ACE_NEW_THROW_EX (this->stats_, + TAO::Transport::Stats, + CORBA::NO_MEMORY ()); +#endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ + + /* + * Hook to add code that initializes components that + * belong to the concrete protocol implementation. + * Further additions to this Transport class will + * need to add code *before* this hook. + */ + //@@ TAO_TRANSPORT_SPL_CONSTRUCTOR_ADD_HOOK +} + +TAO_Transport::~TAO_Transport (void) +{ + delete this->ws_; + + delete this->tms_; + + delete this->handler_lock_; + + if (!this->is_connected_) + { + // When we have a not connected transport we could have buffered + // messages on this transport which we have to cleanup now. + this->cleanup_queue_i(); + + // Cleanup our cache entry + this->purge_entry(); + } + + // Release the partial message block, however we may + // have never allocated one. + ACE_Message_Block::release (this->partial_message_); + + // By the time the destructor is reached here all the connection stuff + // *must* have been cleaned up. + + // The following assert is needed for the test "Bug_2494_Regression". + // See the bugzilla bug #2494 for details. + ACE_ASSERT (this->head_ == 0); + ACE_ASSERT (this->cache_map_entry_ == 0); + +#if TAO_HAS_TRANSPORT_CURRENT == 1 + delete this->stats_; +#endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ + + /* + * Hook to add code that cleans up components + * belong to the concrete protocol implementation. + * Further additions to this Transport class will + * need to add code *before* this hook. + */ + //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK +} + +void +TAO_Transport::provide_handler (TAO::Connection_Handler_Set &handlers) +{ + (void) this->add_reference (); + + handlers.insert (this->connection_handler_i ()); +} + +bool +TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set &h) +{ + if (this->ws_->non_blocking () || + this->opening_connection_role_ == TAO::TAO_SERVER_ROLE) + return false; + + (void) this->add_reference (); + + h.insert (this->connection_handler_i ()); + + return true; +} + +bool +TAO_Transport::idle_after_send (void) +{ + return this->tms ()->idle_after_send (); +} + +bool +TAO_Transport::idle_after_reply (void) +{ + return this->tms ()->idle_after_reply (); +} + +/* + * A concrete transport class specializes this + * method. This hook allows commenting this function + * when TAO's transport is specialized. Note: All + * functions that have an implementation that does + * nothing should be added within this hook to + * enable specialization. + */ +//@@ TAO_TRANSPORT_SPL_COMMENT_HOOK_START + +int +TAO_Transport::tear_listen_point_list (TAO_InputCDR &) +{ + ACE_NOTSUP_RETURN (-1); +} + +int +TAO_Transport::send_message_shared (TAO_Stub *stub, + int message_semantics, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + int result = 0; + + { + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + + result = + this->send_message_shared_i (stub, message_semantics, + message_block, max_wait_time); + } + + if (result == -1) + { + this->close_connection (); + } + + return result; +} + +//@@ TAO_TRANSPORT_SPL_COMMENT_HOOK_END + +bool +TAO_Transport::post_connect_hook (void) +{ + return true; +} + +void +TAO_Transport::close_connection (void) +{ + this->connection_handler_i ()->close_connection (); +} + +int +TAO_Transport::register_handler (void) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"), + this->id ())); + } + + ACE_Reactor * const r = this->orb_core_->reactor (); + + // @@note: This should be okay since the register handler call will + // not make a nested call into the transport. + ACE_GUARD_RETURN (ACE_Lock, + ace_mon, + *this->handler_lock_, + false); + + if (r == this->event_handler_i ()->reactor ()) + { + return 0; + } + + // Set the flag in the Connection Handler and in the Wait Strategy + // @@Maybe we should set these flags after registering with the + // reactor. What if the registration fails??? + this->ws_->is_registered (true); + + // Register the handler with the reactor + return r->register_handler (this->event_handler_i (), + ACE_Event_Handler::READ_MASK); +} + +#if TAO_HAS_SENDFILE == 1 +ssize_t +TAO_Transport::sendfile (TAO_MMAP_Allocator * /* allocator */, + iovec * iov, + int iovcnt, + size_t &bytes_transferred, + ACE_Time_Value const * timeout) +{ + // Concrete pluggable transport doesn't implement sendfile(). + // Fallback on TAO_Transport::send(). + + // @@ We can probably refactor the TAO_IIOP_Transport::sendfile() + // implementation to this base class method, and leave any TCP + // specific configuration out of this base class method. + // -Ossama + return this->send (iov, iovcnt, bytes_transferred, timeout); +} +#endif /* TAO_HAS_SENDFILE==1 */ + +int +TAO_Transport::generate_locate_request ( + TAO_Target_Specification &spec, + TAO_Operation_Details &opdetails, + TAO_OutputCDR &output) +{ + if (this->messaging_object ()->generate_locate_request_header (opdetails, + spec, + output) + == -1) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ") + ACE_TEXT ("error while marshalling the LocateRequest header\n"), + this->id ())); + } + + return -1; + } + + return 0; +} + +int +TAO_Transport::generate_request_header ( + TAO_Operation_Details &opdetails, + TAO_Target_Specification &spec, + TAO_OutputCDR &output) +{ + // codeset service context is only supposed to be sent in the first request + // on a particular connection. + if (this->first_request_) + { + TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager (); + if (csm) + csm->generate_service_context (opdetails,*this); + } + + if (this->messaging_object ()->generate_request_header (opdetails, + spec, + output) == -1) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ") + ACE_TEXT ("error while marshalling the Request header\n"), + this->id())); + } + + return -1; + } + + return 0; +} + +/// @todo Ideally the following should be inline. +/// @todo purge_entry has a return value, use it +int +TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc) +{ + // First purge our entry + this->purge_entry (); + + // Then add ourselves to the cache + return this->transport_cache_manager ().cache_transport (desc, + this); +} + +int +TAO_Transport::purge_entry (void) +{ + return this->transport_cache_manager ().purge_entry (this->cache_map_entry_); +} + +int +TAO_Transport::make_idle (void) +{ + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"), + this->id ())); + } + + return this->transport_cache_manager ().make_idle (this->cache_map_entry_); +} + +int +TAO_Transport::update_transport (void) +{ + return this->transport_cache_manager ().update_entry (this->cache_map_entry_); +} + +/* + * + * Methods called and used in the output path of the ORB. + * + */ +int +TAO_Transport::handle_output (void) +{ + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"), + this->id ())); + } + + // The flushing strategy (potentially via the Reactor) wants to send + // more data, first check if there is a current message that needs + // more sending... + int const retval = this->drain_queue (); + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ") + ACE_TEXT ("drain_queue returns %d/%d\n"), + this->id (), + retval, errno)); + } + + // Any errors are returned directly to the Reactor + return retval; +} + +int +TAO_Transport::format_queue_message (TAO_OutputCDR &stream, + ACE_Time_Value *max_wait_time) +{ + if (this->messaging_object ()->format_message (stream) != 0) + return -1; + + return this->queue_message_i (stream.begin (), max_wait_time); +} + +int +TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb, + size_t &bytes_transferred, + ACE_Time_Value *max_wait_time) +{ + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + + return this->send_message_block_chain_i (mb, + bytes_transferred, + max_wait_time); +} + +int +TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, + size_t &bytes_transferred, + ACE_Time_Value *) +{ + const size_t total_length = mb->total_length (); + + // We are going to block, so there is no need to clone + // the message block. + TAO_Synch_Queued_Message synch_message (mb, + this->orb_core_); + + synch_message.push_back (this->head_, this->tail_); + + const int n = this->drain_queue_i (); + + if (n == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + return -1; // Error while sending... + } + else if (n == 1) + { + bytes_transferred = total_length; + return 1; // Empty queue, message was sent.. + } + + // Remove the temporary message from the queue... + synch_message.remove_from_list (this->head_, this->tail_); + + bytes_transferred = + total_length - synch_message.message_length (); + + return 0; +} + +int +TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, + ACE_Time_Value *max_wait_time) +{ + // We are going to block, so there is no need to clone + // the message block. + TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); + const size_t message_length = synch_message.message_length (); + + synch_message.push_back (this->head_, this->tail_); + + const int n = this->send_synch_message_helper_i (synch_message, + 0 /*ignored*/); + if (n == -1 || n == 1) + { + return n; + } + + // @todo: Check for timeouts! + // if (max_wait_time != 0 && errno == ETIME) return -1; + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + int result = flushing_strategy->schedule_output (this); + if (result == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") + ACE_TEXT ("send_synchronous_message_i, ") + ACE_TEXT ("error while scheduling flush - %m\n"), + this->id ())); + } + return -1; + } + + // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, + // because we're always going to flush anyway. + + // Release the mutex, other threads may modify the queue as we + // block for a long time writing out data. + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + + result = flushing_strategy->flush_message (this, + &synch_message, + max_wait_time); + } + + if (result == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + + if (errno == ETIME) + { + // If partially sent, then we must queue the remainder. + if (message_length != synch_message.message_length ()) + { + // This is a timeout, there is only one nasty case: the + // message has been partially sent! We simply cannot take + // the message out of the queue, because that would corrupt + // the connection. + // + // What we do is replace the queued message with an + // asynchronous message, that contains only what remains of + // the timed out request. If you think about sending + // CancelRequests in this case: there is no much point in + // doing that: the receiving ORB would probably ignore it, + // and figuring out the request ID would be a bit of a + // nightmare. + // + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message ( + synch_message.current_block (), + this->orb_core_, + 0, // no timeout + 0, + 1), + -1); + queued_message->push_front (this->head_, this->tail_); + } + } + + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ") + ACE_TEXT ("error while flushing message - %m\n"), + this->id ())); + } + + return -1; + } + + return 1; +} + + +int +TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, + ACE_Time_Value *max_wait_time) +{ + // Dont clone now.. We could be sent in one shot! + TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); + + synch_message.push_back (this->head_, + this->tail_); + + int const n = + this->send_synch_message_helper_i (synch_message, + max_wait_time); + + if (n == -1 || n == 1) + { + return n; + } + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ") + ACE_TEXT ("preparing to add to queue before leaving\n"), + this->id ())); + } + + // Till this point we shouldn't have any copying and that is the + // point anyway. Now, remove the node from the list + synch_message.remove_from_list (this->head_, + this->tail_); + + // Clone the node that we have. + TAO_Queued_Message *msg = + synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); + + // Stick it in the queue + msg->push_back (this->head_, + this->tail_); + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + int const result = flushing_strategy->schedule_output (this); + + if (result == -1) + { + if (TAO_debug_level > 5) + { + ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_" + "message_i, dequeuing msg due to schedule_output " + "failure\n", this->id ())); + } + msg->remove_from_list (this->head_, this->tail_); + msg->destroy (); + } + else if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + (void) flushing_strategy->flush_message(this, msg, 0); + } + + return 1; +} + +int +TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message, + ACE_Time_Value * /*max_wait_time*/) +{ + // @@todo: Need to send timeouts for writing.. + int const n = this->drain_queue_i (); + + if (n == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + return -1; // Error while sending... + } + else if (n == 1) + { + return 1; // Empty queue, message was sent.. + } + + if (synch_message.all_data_sent ()) + { + return 1; + } + + return 0; +} + +int +TAO_Transport::queue_is_empty_i (void) +{ + return (this->head_ == 0); +} + + +int +TAO_Transport::schedule_output_i (void) +{ + ACE_Event_Handler * const eh = this->event_handler_i (); + ACE_Reactor * const reactor = eh->reactor (); + + if (reactor == 0) + return -1; + + // Check to see if our event handler is still registered with the + // reactor. It's possible for another thread to have run close_connection() + // since we last used the event handler. + ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ()); + if (found != eh) + { + if(TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::schedule_output_i " + "event handler not found in reactor, returning -1\n", + this->id ())); + } + if (found) + { + found->remove_reference (); + } + return -1; + } + found->remove_reference (); + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"), + this->id ())); + } + + return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); +} + +int +TAO_Transport::cancel_output_i (void) +{ + ACE_Event_Handler * const eh = this->event_handler_i (); + ACE_Reactor *const reactor = eh->reactor (); + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"), + this->id ())); + } + + return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); +} + +int +TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, + const void *act) +{ + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ") + ACE_TEXT ("timer expired\n"), + this->id ())); + } + + /// This is the only legal ACT in the current configuration.... + if (act != &this->current_deadline_) + { + return -1; + } + + if (this->flush_timer_pending ()) + { + // The timer is always a oneshot timer, so mark is as not + // pending. + this->reset_flush_timer (); + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + int const result = flushing_strategy->schedule_output (this); + if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + (void) flushing_strategy->flush_transport (this); + } + } + + return 0; +} + +int +TAO_Transport::drain_queue (void) +{ + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + int const retval = this->drain_queue_i (); + + if (retval == 1) + { + // ... there is no current message or it was completely + // sent, cancel output... + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + flushing_strategy->cancel_output (this); + + return 0; + } + + return retval; +} + +int +TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) +{ + size_t byte_count = 0; + + // ... send the message ... + ssize_t retval = -1; + +#if TAO_HAS_SENDFILE == 1 + if (this->mmap_allocator_) + retval = this->sendfile (this->mmap_allocator_, + iov, + iovcnt, + byte_count); + else +#endif /* TAO_HAS_SENDFILE==1 */ + retval = this->send (iov, iovcnt, byte_count); + + if (TAO_debug_level == 5) + { + dump_iov (iov, iovcnt, this->id (), + byte_count, "drain_queue_helper"); + } + + // ... now we need to update the queue, removing elements + // that have been sent, and updating the last element if it + // was only partially sent ... + this->cleanup_queue (byte_count); + iovcnt = 0; + + if (retval == 0) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") + ACE_TEXT ("send() returns 0\n"), + this->id ())); + } + return -1; + } + else if (retval == -1) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") + ACE_TEXT ("error during %p\n"), + this->id (), ACE_TEXT ("send()"))); + } + + if (errno == EWOULDBLOCK || errno == EAGAIN) + { + return 0; + } + + return -1; + } + + // ... start over, how do we guarantee progress? Because if + // no bytes are sent send() can only return 0 or -1 + + // Total no. of bytes sent for a send call + this->sent_byte_count_ += byte_count; + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") + ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"), + this->id(), byte_count, (this->head_ == 0))); + } + + return 1; +} + +int +TAO_Transport::drain_queue_i (void) +{ + // This is the vector used to send data, it must be declared outside + // the loop because after the loop there may still be data to be + // sent + int iovcnt = 0; +#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) + iovec iov[ACE_IOV_MAX] = { 0 , 0 }; +#else + iovec iov[ACE_IOV_MAX]; +#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ + + // We loop over all the elements in the queue ... + TAO_Queued_Message *i = this->head_; + + // Reset the value so that the counting is done for each new send + // call. + this->sent_byte_count_ = 0; + + // Avoid calling this expensive function each time through the loop. Instead + // we'll assume that the time is unlikely to change much during the loop. + // If we are forced to send in the loop then we'll recompute the time. + ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr (); + + while (i != 0) + { + if (i->is_expired (now)) + { + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ") + ACE_TEXT ("Discarding expired queued message.\n"), + this->id ())); + } + TAO_Queued_Message *next = i->next (); + i->state_changed (TAO_LF_Event::LFS_TIMEOUT, + this->orb_core_->leader_follower ()); + i->remove_from_list (this->head_, this->tail_); + i->destroy (); + i = next; + continue; + } + // ... each element fills the iovector ... + i->fill_iov (ACE_IOV_MAX, iovcnt, iov); + + // ... the vector is full, no choice but to send some data out. + // We need to loop because a single message can span multiple + // IOV_MAX elements ... + if (iovcnt == ACE_IOV_MAX) + { + int const retval = + this->drain_queue_helper (iovcnt, iov); + + now = ACE_High_Res_Timer::gettimeofday_hr (); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") + ACE_TEXT ("helper retval = %d\n"), + this->id (), retval)); + } + + if (retval != 1) + { + return retval; + } + + i = this->head_; + continue; + } + // ... notice that this line is only reached if there is still + // room in the iovector ... + i = i->next (); + } + + if (iovcnt != 0) + { + int const retval = this->drain_queue_helper (iovcnt, iov); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") + ACE_TEXT ("helper retval = %d\n"), + this->id (), retval)); + } + + if (retval != 1) + { + return retval; + } + } + + if (this->head_ == 0) + { + if (this->flush_timer_pending ()) + { + ACE_Event_Handler *eh = this->event_handler_i (); + ACE_Reactor * const reactor = eh->reactor (); + reactor->cancel_timer (this->flush_timer_id_); + this->reset_flush_timer (); + } + + return 1; + } + + return 0; +} + +void +TAO_Transport::cleanup_queue_i () +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") + ACE_TEXT ("cleaning up complete queue\n"), + this->id ())); + } + + size_t byte_count = 0; + int msg_count = 0; + + // Cleanup all messages + while (this->head_ != 0) + { + TAO_Queued_Message *i = this->head_; + + if (TAO_debug_level > 4) + { + byte_count += i->message_length(); + ++msg_count; + } + // @@ This is a good point to insert a flag to indicate that a + // CloseConnection message was successfully received. + i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, + this->orb_core_->leader_follower ()); + + i->remove_from_list (this->head_, this->tail_); + + i->destroy (); + } + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") + ACE_TEXT ("discarded %d messages, %u bytes.\n"), + this->id (), msg_count, byte_count)); + } +} + +void +TAO_Transport::cleanup_queue (size_t byte_count) +{ + while (this->head_ != 0 && byte_count > 0) + { + TAO_Queued_Message *i = this->head_; + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") + ACE_TEXT ("byte_count = %d\n"), + this->id (), byte_count)); + } + + // Update the state of the first message + i->bytes_transferred (byte_count); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") + ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"), + this->id (), byte_count, i->all_data_sent (), + i->message_length ())); + } + + // ... if all the data was sent the message must be removed from + // the queue... + if (i->all_data_sent ()) + { + i->remove_from_list (this->head_, this->tail_); + i->destroy (); + } + } +} + +int +TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, + bool &must_flush) +{ + // First let's compute the size of the queue: + size_t msg_count = 0; + size_t total_bytes = 0; + + for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + { + ++msg_count; + total_bytes += i->message_length (); + } + + bool set_timer; + ACE_Time_Value new_deadline; + + bool constraints_reached = + stub->transport_queueing_strategy (). + buffering_constraints_reached (stub, + msg_count, + total_bytes, + must_flush, + this->current_deadline_, + set_timer, + new_deadline); + + // ... set the new timer, also cancel any previous timers ... + if (set_timer) + { + ACE_Event_Handler *eh = this->event_handler_i (); + ACE_Reactor * const reactor = eh->reactor (); + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); + + if (this->flush_timer_pending ()) + { + reactor->cancel_timer (this->flush_timer_id_); + } + + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); + } + + return constraints_reached; +} + +void +TAO_Transport::report_invalid_event_handler (const char *caller) +{ + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler") + ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"), + this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_)); + } +} + +void +TAO_Transport::send_connection_closed_notifications (void) +{ + { + ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); + + this->send_connection_closed_notifications_i (); + } + + this->tms ()->connection_closed (); +} + +void +TAO_Transport::send_connection_closed_notifications_i (void) +{ + this->cleanup_queue_i (); + + this->messaging_object ()->reset (); +} + +int +TAO_Transport::send_message_shared_i (TAO_Stub *stub, + int message_semantics, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + int ret = 0; + +#if TAO_HAS_TRANSPORT_CURRENT == 1 + size_t const message_length = message_block->length (); +#endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ + + switch (message_semantics) + { + case TAO_Transport::TAO_TWOWAY_REQUEST: + ret = this->send_synchronous_message_i (message_block, + max_wait_time); + break; + + case TAO_Transport::TAO_REPLY: + ret = this->send_reply_message_i (message_block, + max_wait_time); + break; + + case TAO_Transport::TAO_ONEWAY_REQUEST: + ret = this->send_asynchronous_message_i (stub, + message_block, + max_wait_time); + break; + } + +#if TAO_HAS_TRANSPORT_CURRENT == 1 + // "Count" the message, only if no error was encountered. + if (ret != -1 && this->stats_ != 0) + this->stats_->messages_sent (message_length); +#endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ + + return ret; +} + +int +TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + // Let's figure out if the message should be queued without trying + // to send first: + bool try_sending_first = true; + + const bool queue_empty = (this->head_ == 0); + + if (!queue_empty) + { + try_sending_first = false; + } + else if (stub->transport_queueing_strategy ().must_queue (queue_empty)) + { + try_sending_first = false; + } + + if (try_sending_first) + { + ssize_t n = 0; + size_t byte_count = 0; + // ... in this case we must try to send the message first ... + + const size_t total_length = message_block->total_length (); + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") + ACE_TEXT ("trying to send the message (ml = %d)\n"), + this->id (), total_length)); + } + + // @@ I don't think we want to hold the mutex here, however if + // we release it we need to recheck the status of the transport + // after we return... once I understand the final form for this + // code I will re-visit this decision + n = this->send_message_block_chain_i (message_block, + byte_count, + max_wait_time); + if (n == -1) + { + // ... if this is just an EWOULDBLOCK we must schedule the + // message for later, if it is ETIME we still have to send + // the complete message, because cutting off the message at + // this point will destroy the synchronization with the + // server ... + if (errno != EWOULDBLOCK && errno != ETIME) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") + ACE_TEXT ("fatal error in ") + ACE_TEXT ("send_message_block_chain_i - %m\n"), + this->id ())); + } + return -1; + } + } + + // ... let's figure out if the complete message was sent ... + if (total_length == byte_count) + { + // Done, just return. Notice that there are no allocations + // or copies up to this point (though some fancy calling + // back and forth). + // This is the common case for the critical path, it should + // be fast. + return 0; + } + + // If it was partially sent, then we can't allow a timeout + if (byte_count > 0) + max_wait_time = 0; + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") + ACE_TEXT ("partial send %d / %d bytes\n"), + this->id (), byte_count, total_length)); + } + + // ... part of the data was sent, need to figure out what piece + // of the message block chain must be queued ... + while (message_block != 0 && message_block->length () == 0) + { + message_block = message_block->cont (); + } + + // ... at least some portion of the message block chain should + // remain ... + } + + // ... either the message must be queued or we need to queue it + // because it was not completely sent out ... + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") + ACE_TEXT ("message is queued\n"), + this->id ())); + } + + if (this->queue_message_i (message_block, max_wait_time) == -1) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") + ACE_TEXT ("send_asynchronous_message_i, ") + ACE_TEXT ("cannot queue message for - %m\n"), + this->id ())); + } + return -1; + } + + // ... if the queue is full we need to activate the output on the + // queue ... + bool must_flush = false; + const bool constraints_reached = + this->check_buffering_constraints_i (stub, + must_flush); + + // ... but we also want to activate it if the message was partially + // sent.... Plus, when we use the blocking flushing strategy the + // queue is flushed as a side-effect of 'schedule_output()' + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + if (constraints_reached || try_sending_first) + { + int const result = flushing_strategy->schedule_output (this); + if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + must_flush = true; + } + } + + if (must_flush) + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + + (void) flushing_strategy->flush_transport (this); + } + + return 0; +} + +int +TAO_Transport::queue_message_i (const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message (message_block, + this->orb_core_, + max_wait_time, + 0, + 1), + -1); + queued_message->push_back (this->head_, this->tail_); + + return 0; +} + +/* + * + * All the methods relevant to the incoming data path of the ORB are + * defined below + * + */ +int +TAO_Transport::handle_input (TAO_Resume_Handle &rh, + ACE_Time_Value * max_wait_time, + int /* block */ /* deprecated parameter */ ) +{ + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"), + this->id ())); + } + + // First try to process messages of the head of the incoming queue. + int const retval = this->process_queue_head (rh); + + if (retval <= 0) + { + if (retval == -1) + { + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") + ACE_TEXT ("error while parsing the head of the queue\n"), + this->id())); + + } + return -1; + } + else + { + // retval == 0 + + // Processed a message in queue successfully. This + // thread must return to thread-pool now. + return 0; + } + } + + TAO_Queued_Data *q_data = 0; + + if (this->incoming_message_stack_.top (q_data) != -1 + && q_data->missing_data_ != TAO_MISSING_DATA_UNDEFINED) + { + /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */ + if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") + ACE_TEXT ("error consolidating incoming message\n"), + this->id ())); + } + return -1; + } + } + else + { + if (this->handle_input_parse_data (rh, max_wait_time) == -1) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") + ACE_TEXT ("error parsing incoming message\n"), + this->id ())); + } + return -1; + } + } + + return 0; +} + +int +TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data, + TAO_Resume_Handle &rh) +{ + // paranoid check + if (q_data->missing_data_ != 0) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") + ACE_TEXT ("missing data\n"), + this->id ())); + } + return -1; + } + + if (q_data->more_fragments_ || + q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + // consolidate message on top of stack, only for fragmented messages + TAO_Queued_Data *new_q_data = 0; + + switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) + { + case -1: // error + return -1; + + case 0: // returning consolidated message in q_data + if (!new_q_data) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") + ACE_TEXT ("error, consolidated message is NULL\n"), + this->id ())); + } + return -1; + } + + + if (this->process_parsed_messages (new_q_data, rh) == -1) + { + TAO_Queued_Data::release (new_q_data); + + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") + ACE_TEXT ("error processing consolidated message\n"), + this->id ())); + } + return -1; + } + + TAO_Queued_Data::release (new_q_data); + + break; + + case 1: // fragment has been stored in messaging_oject() + break; + } + } + else + { + if (this->process_parsed_messages (q_data, rh) == -1) + { + TAO_Queued_Data::release (q_data); + + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") + ACE_TEXT ("error processing message\n"), + this->id ())); + } + return -1; + } + + TAO_Queued_Data::release (q_data); + + } + + return 0; +} + +int +TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data) +{ + // consolidate message on top of stack, only for fragmented messages + + // paranoid check + if (q_data->missing_data_ != 0) + { + return -1; + } + + if (q_data->more_fragments_ || + q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + TAO_Queued_Data *new_q_data = 0; + + switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) + { + case -1: // error + return -1; + + case 0: // returning consolidated message in new_q_data + if (!new_q_data) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ") + ACE_TEXT ("error, consolidated message is NULL\n"), + this->id ())); + } + return -1; + } + + if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0) + { + TAO_Queued_Data::release (new_q_data); + return -1; + } + break; + + case 1: // fragment has been stored in messaging_oject() + break; + } + } + else + { + if (this->incoming_message_queue_.enqueue_tail (q_data) != 0) + { + TAO_Queued_Data::release (q_data); + return -1; + } + } + + return 0; // success +} + +int +TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh, + ACE_Time_Value * max_wait_time, + TAO_Queued_Data *q_data) +{ + // paranoid check + if (q_data == 0) + { + return -1; + } + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") + ACE_TEXT ("enter (missing data == %d)\n"), + this->id (), q_data->missing_data_)); + } + + size_t const recv_size = q_data->missing_data_; + + // make sure the message_block has enough space + size_t const message_size = recv_size + + q_data->msg_block_->length(); + + if (q_data->msg_block_->space() < recv_size) + { + if (ACE_CDR::grow (q_data->msg_block_, message_size) == -1) + { + return -1; + } + } + + // Saving the size of the received buffer in case any one needs to + // get the size of the message thats received in the + // context. Obviously the value will be changed for each recv call + // and the user is supposed to invoke the accessor only in the + // invocation context to get meaningful information. + this->recv_buffer_size_ = recv_size; + + // Read the message into the existing message block on heap + ssize_t const n = this->recv (q_data->msg_block_->wr_ptr(), + recv_size, + max_wait_time); + + + if (n <= 0) + { + return n; + } + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") + ACE_TEXT ("read bytes %d\n"), + this->id (), n)); + } + + q_data->msg_block_->wr_ptr(n); + q_data->missing_data_ -= n; + + if (q_data->missing_data_ == 0) + { + // paranoid check + if (this->incoming_message_stack_.pop (q_data) == -1) + { + return -1; + } + + if (this->consolidate_process_message (q_data, rh) == -1) + { + return -1; + } + } + + return 0; +} + + +int +TAO_Transport::handle_input_parse_extra_messages (ACE_Message_Block &message_block) +{ + + // store buffer status of last extraction: -1 parse error, 0 + // incomplete message header in buffer, 1 complete messages header + // parsed + int buf_status = 0; + + TAO_Queued_Data *q_data = 0; // init + + // parse buffer until all messages have been extracted, consolidate + // and enqueue complete messages, if the last message being parsed + // has missin data, it is stays on top of incoming_message_stack. + while (message_block.length () > 0 && + (buf_status = this->messaging_object ()->extract_next_message + (message_block, q_data)) != -1 && + q_data != 0) // paranoid check + { + if (q_data->missing_data_ == 0) + { + if (this->consolidate_enqueue_message (q_data) == -1) + { + return -1; + } + } + else // incomplete message read, probably the last message in buffer + { + // can not fail + this->incoming_message_stack_.push (q_data); + } + + q_data = 0; // reset + } // while + + if (buf_status == -1) + { + return -1; + } + + return 0; +} + +int +TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh, + ACE_Time_Value * max_wait_time) +{ + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") + ACE_TEXT ("enter\n"), + this->id ())); + } + + + // The buffer on the stack which will be used to hold the input + // messages, ACE_CDR::MAX_ALIGNMENT compensates the + // memory-alignment. This improves performance with SUN-Java-ORB-1.4 + // and higher that sends fragmented requests of size 1024 bytes. + char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT]; + +#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) + (void) ACE_OS::memset (buf, + '\0', + sizeof buf); +#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ + + // Create a data block + ACE_Data_Block db (sizeof (buf), + ACE_Message_Block::MB_DATA, + buf, + this->orb_core_->input_cdr_buffer_allocator (), + this->orb_core_->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + this->orb_core_->input_cdr_dblock_allocator ()); + + // Create a message block + ACE_Message_Block message_block (&db, + ACE_Message_Block::DONT_DELETE, + this->orb_core_->input_cdr_msgblock_allocator ()); + + + // Align the message block + ACE_CDR::mb_align (&message_block); + + size_t recv_size = 0; // Note: unsigned integer + + // Pointer to newly parsed message + TAO_Queued_Data *q_data = 0; + + // optimizing access of constants + const size_t header_length = + this->messaging_object ()->header_length (); + + // paranoid check + if (header_length > message_block.space ()) + { + return -1; + } + + if (this->orb_core_->orb_params ()->single_read_optimization ()) + { + recv_size = + message_block.space (); + } + else + { + // Single read optimization has been de-activated. That means + // that we need to read from transport the GIOP header first + // before the payload. This codes first checks the incoming + // stack for partial messages which needs to be + // consolidated. Otherwise we are in new cycle, reading complete + // GIOP header of new incoming message. + if (this->incoming_message_stack_.top (q_data) != -1 + && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED) + { + // There is a partial message on incoming_message_stack_ + // whose length is unknown so far. We need to consolidate + // the GIOP header to get to know the payload size, + recv_size = header_length - q_data->msg_block_->length (); + } + else + { + // Read amount of data forming GIOP header of new incoming + // message. + recv_size = header_length; + } + // POST: 0 <= recv_size <= header_length + } + // POST: 0 <= recv_size <= message_block->space () + + // If we have a partial message, copy it into our message block and + // clear out the partial message. + if (this->partial_message_ != 0 && this->partial_message_->length () > 0) + { + // (*) Copy back the partial message into current read-buffer, + // verify that the read-strategy of "recv_size" bytes is not + // exceeded. The latter check guarantees that recv_size does not + // roll-over and keeps in range + // 0<=recv_size<=message_block->space() + if (this->partial_message_->length () <= recv_size && + message_block.copy (this->partial_message_->rd_ptr (), + this->partial_message_->length ()) == 0) + { + + recv_size -= this->partial_message_->length (); + this->partial_message_->reset (); + } + else + { + return -1; + } + } + // POST: 0 <= recv_size <= buffer_space + + if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0 + { + // This event would cause endless looping, trying frequently to + // read zero bytes from stream. This might happen, if TAOs + // protocol implementation is not correct and tries to read data + // beyond header without "single_read_optimazation" being + // activated. + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") + ACE_TEXT ("Error - endless loop detection, closing connection"), + this->id ())); + } + return -1; + } + + // Saving the size of the received buffer in case any one needs to + // get the size of the message thats received in the + // context. Obviously the value will be changed for each recv call + // and the user is supposed to invoke the accessor only in the + // invocation context to get meaningful information. + this->recv_buffer_size_ = recv_size; + + // Read the message into the message block that we have created on + // the stack. + const ssize_t n = this->recv (message_block.wr_ptr (), + recv_size, + max_wait_time); + + // If there is an error return to the reactor.. + if (n <= 0) + { + return n; + } + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") + ACE_TEXT ("read %d bytes\n"), + this->id (), n)); + } + + // Set the write pointer in the stack buffer + message_block.wr_ptr (n); + + // + // STACK PROCESSING OR MESSAGE CONSOLIDATION + // + + // PRE: data in buffer is aligned && message_block.length() > 0 + + if (this->incoming_message_stack_.top (q_data) != -1 + && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED) + { + // + // MESSAGE CONSOLIDATION + // + + // Partial message on incoming_message_stack_ needs to be + // consolidated. The message header could not be parsed so far + // and therefor the message size is unknown yet. Consolidating + // the message destroys the memory alignment of succeeding + // messages sharing the buffer, for that reason consolidation + // and stack based processing are mutial exclusive. + if (this->messaging_object ()->consolidate_node (q_data, + message_block) == -1) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") + ACE_TEXT ("error consolidating message from input buffer\n"), + this->id () )); + } + return -1; + } + + // Complete message are to be enqueued and later processed + if (q_data->missing_data_ == 0) + { + if (this->incoming_message_stack_.pop (q_data) == -1) + { + return -1; + } + + if (this->consolidate_enqueue_message (q_data) == -1) + { + return -1; + } + } + + if (message_block.length () > 0 + && this->handle_input_parse_extra_messages (message_block) == -1) + { + return -1; + } + + // In any case try to process the enqueued messages + if (this->process_queue_head (rh) == -1) + { + return -1; + } + } + else + { + // + // STACK PROCESSING (critical path) + // + + // Process the first message in buffer on stack + + // (PRE: first message resides in aligned memory) Make a node of + // the message-block.. + + TAO_Queued_Data qd (&message_block, + this->orb_core_->transport_message_buffer_allocator ()); + + size_t mesg_length = 0; + + if (this->messaging_object ()->parse_next_message (message_block, + qd, + mesg_length) == -1 + || (qd.missing_data_ == 0 + && mesg_length > message_block.length ()) ) + { + // extracting message failed + return -1; + } + // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length() + // This prevents seeking rd_ptr behind the wr_ptr + + if (qd.missing_data_ != 0 || + qd.more_fragments_ || + qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + if (qd.missing_data_ == 0) + { + // Dealing with a fragment + TAO_Queued_Data *nqd = + TAO_Queued_Data::duplicate (qd); + + if (nqd == 0) + { + return -1; + } + + // mark the end of message in new buffer + char* end_mark = nqd->msg_block_->rd_ptr () + + mesg_length; + nqd->msg_block_->wr_ptr (end_mark); + + // move the read pointer forward in old buffer + message_block.rd_ptr (mesg_length); + + // enqueue the message + if (this->consolidate_enqueue_message (nqd) == -1) + { + return -1; + } + + if (message_block.length () > 0 + && this->handle_input_parse_extra_messages (message_block) == -1) + { + return -1; + } + + // In any case try to process the enqueued messages + if (this->process_queue_head (rh) == -1) + { + return -1; + } + } + else if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED) + { + // Incomplete message, must be the last one in buffer + + if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED && + qd.missing_data_ > message_block.space ()) + { + // Re-Allocate correct size on heap + if (ACE_CDR::grow (qd.msg_block_, + message_block.length () + + qd.missing_data_) == -1) + { + return -1; + } + } + + TAO_Queued_Data *nqd = + TAO_Queued_Data::duplicate (qd); + + if (nqd == 0) + { + return -1; + } + + // move read-pointer to end of buffer + message_block.rd_ptr (message_block.length()); + + this->incoming_message_stack_.push (nqd); + } + } + else + { + // + // critical path + // + + // We cant process the message on stack right now. First we + // have got to parse extra messages from message_block, + // putting them into queue. When this is done we can return + // to process this message, and notifying other threads to + // process the messages in queue. + + char * end_marker = message_block.rd_ptr () + + mesg_length; + + if (message_block.length () > mesg_length) + { + // There are more message in data stream to be parsed. + // Safe the rd_ptr to restore later. + char *rd_ptr_stack_mesg = message_block.rd_ptr (); + + // Skip parsed message, jump to next message in buffer + // PRE: mesg_length <= message_block.length () + message_block.rd_ptr (mesg_length); + + // Extract remaining messages and enqueue them for later + // heap processing + if (this->handle_input_parse_extra_messages (message_block) == -1) + { + return -1; + } + + // correct the end_marker + end_marker = message_block.rd_ptr (); + + // Restore rd_ptr + message_block.rd_ptr (rd_ptr_stack_mesg); + } + + // The following if-else has been copied from + // process_queue_head(). While process_queue_head() + // processes message on heap, here we will process a message + // on stack. + + // Now that we have one message on stack to be processed, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.queue_length () > 0) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") + ACE_TEXT ("notify reactor\n"), + this->id ())); + + } + + const int retval = this->notify_reactor (); + + if (retval == 1) + { + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); + } + else if (retval < 0) + return -1; + } + else + { + // As there are no further messages in queue just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + } + + // PRE: incoming_message_queue is empty + if (this->process_parsed_messages (&qd, + rh) == -1) + { + return -1; + } + + // move the rd_ptr tp position of end_marker + message_block.rd_ptr (end_marker); + } + } + + // Now that all cases have been processed, there might be kept some data + // in buffer that needs to be safed for next "handle_input" invocations. + if (message_block.length () > 0) + { + if (this->partial_message_ == 0) + { + this->allocate_partial_message_block (); + } + + if (this->partial_message_ != 0 && + this->partial_message_->copy (message_block.rd_ptr (), + message_block.length ()) == 0) + { + message_block.rd_ptr (message_block.length ()); + } + else + { + return -1; + } + } + + return 0; +} + + +int +TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh) +{ + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") + ACE_TEXT ("entering (missing data == %d)\n"), + this->id(), qd->missing_data_)); + } + + // Get the <message_type> that we have received + const TAO_Pluggable_Message_Type t = qd->msg_type_; + +#if TAO_HAS_TRANSPORT_CURRENT == 1 + // Update stats, if any + if (this->stats_ != 0) + this->stats_->messages_received (qd->msg_block_->length ()); +#endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ + + if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") + ACE_TEXT ("received CloseConnection message - %m\n"), + this->id())); + + // Return a "-1" so that the next stage can take care of + // closing connection and the necessary memory management. + return -1; + } + else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST || + t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST) + { + // Let us resume the handle before we go ahead to process the + // request. This will open up the handle for other threads. + rh.resume_handle (); + + if (this->messaging_object ()->process_request_message ( + this, + qd) == -1) + { + // Return a "-1" so that the next stage can take care of + // closing connection and the necessary memory management. + return -1; + } + } + else if (t == TAO_PLUGGABLE_MESSAGE_REPLY || + t == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) + { + rh.resume_handle (); + + TAO_Pluggable_Reply_Params params (this); + + if (this->messaging_object ()->process_reply_message (params, + qd) == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") + ACE_TEXT ("error in process_reply_message - %m\n"), + this->id ())); + + return -1; + } + + } + else if (t == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST) + { + // The associated request might be incomplpete residing + // fragmented in messaging object. We must make sure the + // resources allocated by fragments are released. + + if (this->messaging_object ()->discard_fragmented_message (qd) == -1) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") + ACE_TEXT ("error processing CancelRequest\n"), + this->id ())); + } + } + + // We are not able to cancel requests being processed already; + // this is declared as optional feature by CORBA, and TAO does + // not support this currently. + + // Just continue processing, CancelRequest does not mean to cut + // off the connection. + } + else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") + ACE_TEXT ("received MessageError, closing connection\n"), + this->id ())); + } + return -1; + } + + // If not, just return back.. + return 0; +} + +int +TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) +{ + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"), + this->id (), this->incoming_message_queue_.queue_length () )); + } + + // See if message in queue ... + if (this->incoming_message_queue_.queue_length () > 0) + { + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") + ACE_TEXT ("the size of the queue is [%d]\n"), + this->id (), + this->incoming_message_queue_.queue_length())); + } + // Now that we have pulled out out one message out of the queue, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.queue_length () > 0) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") + ACE_TEXT ("notify reactor\n"), + this->id ())); + + } + + const int retval = this->notify_reactor (); + + if (retval == 1) + { + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); + } + else if (retval < 0) + return -1; + } + else + { + // As we are ready to process the last message just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + } + + // Process the message... + if (this->process_parsed_messages (qd, rh) == -1) + { + return -1; + } + + // Delete the Queued_Data.. + TAO_Queued_Data::release (qd); + + return 0; + } + + return 1; +} + +int +TAO_Transport::notify_reactor (void) +{ + if (!this->ws_->is_registered ()) + { + return 0; + } + + ACE_Event_Handler *eh = this->event_handler_i (); + + // Get the reactor associated with the event handler + ACE_Reactor *reactor = this->orb_core ()->reactor (); + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") + ACE_TEXT ("notify to Reactor\n"), + this->id ())); + } + + + // Send a notification to the reactor... + const int retval = reactor->notify (eh, + ACE_Event_Handler::READ_MASK); + + if (retval < 0 && TAO_debug_level > 2) + { + // @@todo: need to think about what is the action that + // we can take when we get here. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") + ACE_TEXT ("notify to the reactor failed..\n"), + this->id ())); + } + + return 1; +} + +TAO::Transport_Cache_Manager & +TAO_Transport::transport_cache_manager (void) +{ + return this->orb_core_->lane_resources ().transport_cache (); +} + +void +TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp) +{ + if (this->char_translator_) + { + this->char_translator_->assign (inp); + this->char_translator_->assign (outp); + } + if (this->wchar_translator_) + { + this->wchar_translator_->assign (inp); + this->wchar_translator_->assign (outp); + } +} + +void +TAO_Transport::clear_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp) +{ + if (inp) + { + inp->char_translator (0); + inp->wchar_translator (0); + } + if (outp) + { + outp->char_translator (0); + outp->wchar_translator (0); + } +} + +ACE_Event_Handler::Reference_Count +TAO_Transport::add_reference (void) +{ + return this->event_handler_i ()->add_reference (); +} + +ACE_Event_Handler::Reference_Count +TAO_Transport::remove_reference (void) +{ + return this->event_handler_i ()->remove_reference (); +} + +TAO_OutputCDR & +TAO_Transport::out_stream (void) +{ + return this->messaging_object ()->out_stream (); +} + +void +TAO_Transport::pre_close (void) +{ + this->is_connected_ = false; + this->purge_entry (); + { + ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); + this->cleanup_queue_i (); + } +} + +bool +TAO_Transport::post_open (size_t id) +{ + this->id_ = id; + + { + ACE_GUARD_RETURN (ACE_Lock, + ace_mon, + *this->handler_lock_, + false); + this->is_connected_ = true; + } + + // When we have data in our outgoing queue schedule ourselves + // for output + if (this->queue_is_empty_i ()) + return true; + + // If the wait strategy wants us to be registered with the reactor + // then we do so. If registeration is required and it succeeds, + // #REFCOUNT# becomes two. + if (this->wait_strategy ()->register_handler () != 0) + { + // Registration failures. + + // Purge from the connection cache, if we are not in the cache, this + // just does nothing. + (void) this->purge_entry (); + + // Close the handler. + (void) this->close_connection (); + + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ") + ACE_TEXT ("could not register the transport ") + ACE_TEXT ("in the reactor.\n"), + this->id ())); + + return false; + } + + return true; +} + +void +TAO_Transport::allocate_partial_message_block (void) +{ + if (this->partial_message_ == 0) + { + // This value must be at least large enough to hold a GIOP message + // header plus a GIOP fragment header + const size_t partial_message_size = + this->messaging_object ()->header_length (); + // + this->messaging_object ()->fragment_header_length (); + // deprecated, conflicts with not-single_read_opt. + + ACE_NEW (this->partial_message_, + ACE_Message_Block (partial_message_size)); + } +} + +/* + * Hook to add concrete implementations from the derived class onto + * TAO's transport. + */ + +//@@ TAO_TRANSPORT_SPL_METHODS_ADD_HOOK + +TAO_END_VERSIONED_NAMESPACE_DECL |