diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 1674 |
1 files changed, 885 insertions, 789 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 2a1f66c5a78..989fa5624ab 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -1,26 +1,26 @@ // $Id$ -#include "tao/Transport.h" - -#include "tao/LF_Follower.h" -#include "tao/Leader_Follower.h" -#include "tao/Client_Strategy_Factory.h" -#include "tao/Wait_Strategy.h" -#include "tao/Transport_Mux_Strategy.h" -#include "tao/Stub.h" -#include "tao/Transport_Queueing_Strategies.h" -#include "tao/Connection_Handler.h" -#include "tao/Pluggable_Messaging.h" -#include "tao/Synch_Queued_Message.h" -#include "tao/Asynch_Queued_Message.h" -#include "tao/Flushing_Strategy.h" -#include "tao/Thread_Lane_Resources.h" -#include "tao/Resume_Handle.h" -#include "tao/Codeset_Manager.h" -#include "tao/Codeset_Translator_Base.h" -#include "tao/debug.h" -#include "tao/CDR.h" -#include "tao/ORB_Core.h" +#include "Transport.h" + +#include "LF_Follower.h" +#include "Leader_Follower.h" +#include "Client_Strategy_Factory.h" +#include "Wait_Strategy.h" +#include "Transport_Mux_Strategy.h" +#include "Stub.h" +#include "Transport_Queueing_Strategies.h" +#include "Connection_Handler.h" +#include "Pluggable_Messaging.h" +#include "Synch_Queued_Message.h" +#include "Asynch_Queued_Message.h" +#include "Flushing_Strategy.h" +#include "Thread_Lane_Resources.h" +#include "Resume_Handle.h" +#include "Codeset_Manager.h" +#include "Codeset_Translator_Base.h" +#include "debug.h" +#include "CDR.h" +#include "ORB_Core.h" #include "ace/OS_NS_sys_time.h" #include "ace/OS_NS_stdio.h" @@ -34,7 +34,7 @@ //@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK #if !defined (__ACE_INLINE__) -# include "tao/Transport.inl" +# include "Transport.inl" #endif /* __ACE_INLINE__ */ @@ -55,7 +55,7 @@ dump_iov (iovec *iov, int iovcnt, size_t id, ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ") ACE_TEXT ("sending %d buffers\n"), - id, ACE_TEXT_CHAR_TO_TCHAR (location), iovcnt)); + id, ACE_TEXT_TO_TCHAR_IN (location), iovcnt)); for (int i = 0; i != iovcnt && 0 < current_transfer; ++i) { @@ -70,7 +70,7 @@ dump_iov (iovec *iov, int iovcnt, size_t id, ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ") ACE_TEXT ("buffer %d/%d has %d bytes\n"), - id, ACE_TEXT_CHAR_TO_TCHAR(location), + id, ACE_TEXT_TO_TCHAR_IN(location), i, iovcnt, iov_len)); @@ -107,13 +107,11 @@ dump_iov (iovec *iov, int iovcnt, size_t id, ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ") ACE_TEXT ("end of data\n"), - id, ACE_TEXT_CHAR_TO_TCHAR(location))); + id, ACE_TEXT_TO_TCHAR_IN(location))); ACE_Log_Msg::instance ()->release (); } -TAO_BEGIN_VERSIONED_NAMESPACE_DECL - TAO_Transport::TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core) : tag_ (tag) @@ -181,9 +179,6 @@ TAO_Transport::~TAO_Transport (void) // By the time the destructor is reached here all the connection stuff // *must* have been cleaned up. - - // The following assert is needed for the test "Bug_2494_Regression". - // See the bugzilla bug #2494 for details. ACE_ASSERT (this->head_ == 0); ACE_ASSERT (this->cache_map_entry_ == 0); @@ -290,7 +285,7 @@ TAO_Transport::register_handler (void) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"), + "TAO (%P|%t) - Transport[%d]::register_handler\n", this->id ())); } @@ -332,8 +327,8 @@ TAO_Transport::generate_locate_request ( if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ") - ACE_TEXT ("error while marshalling the LocateRequest header\n"), + "TAO (%P|%t) - Transport[%d]::generate_locate_request, " + "error while marshalling the LocateRequest header\n", this->id ())); } @@ -365,8 +360,8 @@ TAO_Transport::generate_request_header ( if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ") - ACE_TEXT ("error while marshalling the Request header\n"), + "(%P|%t) - Transport[%d]::generate_request_header, " + "error while marshalling the Request header\n", this->id())); } @@ -401,7 +396,7 @@ TAO_Transport::make_idle (void) if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"), + "TAO (%P|%t) - Transport[%d]::make_idle\n", this->id ())); } @@ -425,7 +420,7 @@ TAO_Transport::handle_output (void) if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"), + "TAO (%P|%t) - Transport[%d]::handle_output\n", this->id ())); } @@ -437,8 +432,8 @@ TAO_Transport::handle_output (void) if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ") - ACE_TEXT ("drain_queue returns %d/%d\n"), + "TAO (%P|%t) - Transport[%d]::handle_output, " + "drain_queue returns %d/%d\n", this->id (), retval, errno)); } @@ -487,14 +482,21 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, if (n == -1) { synch_message.remove_from_list (this->head_, this->tail_); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); return -1; // Error while sending... } else if (n == 1) { + ACE_ASSERT (synch_message.all_data_sent ()); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); bytes_transferred = total_length; return 1; // Empty queue, message was sent.. } + ACE_ASSERT (n == 0); // Some data sent, but data remains. + // Remove the temporary message from the queue... synch_message.remove_from_list (this->head_, this->tail_); @@ -514,7 +516,7 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, synch_message.push_back (this->head_, this->tail_); - int const n = + int n = this->send_synch_message_helper_i (synch_message, max_wait_time); @@ -523,6 +525,8 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, return n; } + ACE_ASSERT (n == 0); + // @todo: Check for timeouts! // if (max_wait_time != 0 && errno == ETIME) return -1; TAO_Flushing_Strategy *flushing_strategy = @@ -583,14 +587,21 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ") - ACE_TEXT ("error while flushing message - %m\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, " + "error while flushing message - %m\n", + this->id ())); } return -1; } + else + { + ACE_ASSERT (synch_message.all_data_sent () != 0); + } + + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); return 1; } @@ -605,7 +616,7 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, synch_message.push_back (this->head_, this->tail_); - int const n = + int n = this->send_synch_message_helper_i (synch_message, max_wait_time); @@ -614,12 +625,14 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, return n; } + ACE_ASSERT (n == 0); + if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ") - ACE_TEXT ("preparing to add to queue before leaving \n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::send_reply_message_i, " + "preparing to add to queue before leaving \n", + this->id ())); } // Till this point we shouldn't have any copying and that is the @@ -638,19 +651,7 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); - int result = flushing_strategy->schedule_output (this); - - if (result == -1) - { - if (TAO_debug_level > 5) - { - ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_" - "message_i dequeuing msg due to schedule_output " - "failure\n", this->id ())); - } - msg->remove_from_list (this->head_, this->tail_); - msg->destroy (); - } + (void) flushing_strategy->schedule_output (this); return 1; } @@ -665,15 +666,24 @@ TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_mess if (n == -1) { synch_message.remove_from_list (this->head_, this->tail_); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); return -1; // Error while sending... } else if (n == 1) { + ACE_ASSERT (synch_message.all_data_sent ()); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); return 1; // Empty queue, message was sent.. } + ACE_ASSERT (n == 0); // Some data sent, but data remains. + if (synch_message.all_data_sent ()) { + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); return 1; } @@ -693,32 +703,11 @@ TAO_Transport::schedule_output_i (void) ACE_Event_Handler *eh = this->event_handler_i (); ACE_Reactor *reactor = eh->reactor (); - // Check to see if our event handler is still registered with the - // reactor. It's possible for another thread to have run close_connection() - // since we last used the event handler. - ACE_Event_Handler *found = reactor->find_handler (eh->get_handle ()); - if (found != eh) - { - if(TAO_debug_level > 3) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::schedule_output_i " - "event handler not found in reactor, returning -1\n", - this->id ())); - } - if (found) - { - found->remove_reference (); - } - return -1; - } - found->remove_reference (); - if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::schedule_output_i\n", + this->id ())); } return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); @@ -733,8 +722,8 @@ TAO_Transport::cancel_output_i (void) if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::cancel_output_i\n", + this->id ())); } return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); @@ -747,9 +736,9 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ") - ACE_TEXT ("timer expired\n"), - this->id ())); + "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, " + "timer expired\n", + this->id ())); } /// This is the only legal ACT in the current configuration.... @@ -776,7 +765,7 @@ int TAO_Transport::drain_queue (void) { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - int const retval = this->drain_queue_i (); + int retval = this->drain_queue_i (); if (retval == 1) { @@ -799,7 +788,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) size_t byte_count = 0; // ... send the message ... - ssize_t const retval = + ssize_t retval = this->send (iov, iovcnt, byte_count); if (TAO_debug_level == 5) @@ -819,9 +808,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") - ACE_TEXT ("send() returns 0\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " + "send() returns 0\n", + this->id ())); } return -1; } @@ -830,9 +819,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") - ACE_TEXT ("error during %p\n"), - this->id (), ACE_TEXT ("send()"))); + "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " + "error during %p\n", + this->id (), ACE_TEXT ("send()"))); } if (errno == EWOULDBLOCK || errno == EAGAIN) @@ -845,6 +834,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) // ... start over, how do we guarantee progress? Because if // no bytes are sent send() can only return 0 or -1 + ACE_ASSERT (byte_count != 0); // Total no. of bytes sent for a send call this->sent_byte_count_ += byte_count; @@ -852,9 +842,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") - ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"), - this->id(), byte_count, (this->head_ == 0))); + "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " + "byte_count = %d, head_is_empty = %d\n", + this->id(), byte_count, (this->head_ == 0))); } return 1; @@ -896,9 +886,9 @@ TAO_Transport::drain_queue_i (void) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") - ACE_TEXT ("helper retval = %d\n"), - this->id (), retval)); + "TAO (%P|%t) - Transport[%d]::drain_queue_i, " + "helper retval = %d\n", + this->id (), retval)); } if (retval != 1) @@ -921,9 +911,9 @@ TAO_Transport::drain_queue_i (void) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") - ACE_TEXT ("helper retval = %d\n"), - this->id (), retval)); + "TAO (%P|%t) - Transport[%d]::drain_queue_i, " + "helper retval = %d\n", + this->id (), retval)); } if (retval != 1) @@ -954,9 +944,9 @@ TAO_Transport::cleanup_queue_i () if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") - ACE_TEXT ("cleaning up complete queue\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::cleanup_queue_i, " + "cleaning up complete queue\n", + this->id ())); } // Cleanup all messages @@ -967,7 +957,7 @@ TAO_Transport::cleanup_queue_i () // @@ This is a good point to insert a flag to indicate that a // CloseConnection message was successfully received. i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, - this->orb_core_->leader_follower ()); + this->orb_core_->leader_follower ()); i->remove_from_list (this->head_, this->tail_); @@ -985,9 +975,9 @@ TAO_Transport::cleanup_queue (size_t byte_count) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") - ACE_TEXT ("byte_count = %d\n"), - this->id (), byte_count)); + "TAO (%P|%t) - Transport[%d]::cleanup_queue, " + "byte_count = %d\n", + this->id (), byte_count)); } // Update the state of the first message @@ -996,10 +986,10 @@ TAO_Transport::cleanup_queue (size_t byte_count) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") - ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"), - this->id (), byte_count, i->all_data_sent (), - i->message_length ())); + "TAO (%P|%t) - Transport[%d]::cleanup_queue, " + "after transfer, bc = %d, all_sent = %d, ml = %d\n", + this->id (), byte_count, i->all_data_sent (), + i->message_length ())); } // ... if all the data was sent the message must be removed from @@ -1068,9 +1058,9 @@ TAO_Transport::report_invalid_event_handler (const char *caller) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler") - ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"), - this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_)); + "TAO (%P|%t) - Transport[%d]::report_invalid_event_handler" + "(%s) no longer associated with handler [tag=%d]\n", + this->id (), ACE_TEXT_TO_TCHAR_IN (caller), this->tag_)); } } @@ -1124,7 +1114,7 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, { // Let's figure out if the message should be queued without trying // to send first: - bool try_sending_first = true; + bool try_sending_first = 1; const bool queue_empty = (this->head_ == 0); @@ -1148,9 +1138,9 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") - ACE_TEXT ("trying to send the message (ml = %d)\n"), - this->id (), total_length)); + "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, " + "trying to send the message (ml = %d)\n", + this->id (), total_length)); } // @@ I don't think we want to hold the mutex here, however if @@ -1172,10 +1162,10 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") - ACE_TEXT ("fatal error in ") - ACE_TEXT ("send_message_block_chain_i - %m\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, " + "fatal error in " + "send_message_block_chain_i - %m\n", + this->id ())); } return -1; } @@ -1195,9 +1185,9 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") - ACE_TEXT ("partial send %d / %d bytes\n"), - this->id (), byte_count, total_length)); + "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, " + "partial send %d / %d bytes\n", + this->id (), byte_count, total_length)); } // ... part of the data was sent, need to figure out what piece @@ -1209,6 +1199,7 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, // ... at least some portion of the message block chain should // remain ... + ACE_ASSERT (message_block != 0); } // ... either the message must be queued or we need to queue it @@ -1217,18 +1208,18 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") - ACE_TEXT ("message is queued\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, " + "message is queued\n", + this->id ())); } if (this->queue_message_i(message_block) == -1) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") - ACE_TEXT ("cannot queue message for ") - ACE_TEXT (" - %m\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, " + "cannot queue message for " + " - %m\n", + this->id ())); return -1; } @@ -1287,13 +1278,13 @@ TAO_Transport::queue_message_i(const ACE_Message_Block *message_block) int TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_Time_Value * max_wait_time, - int /* block */ /* deprecated parameter */ ) + int /*block*/) { if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::handle_input\n", + this->id ())); } // First try to process messages of the head of the incoming queue. @@ -1306,768 +1297,853 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") - ACE_TEXT ("error while parsing the head of the queue\n"), - this->id())); - + "TAO (%P|%t) - Transport[%d]::handle_input, " + "error while parsing the head of the queue\n", + this->id())); } - return -1; } - else - { - // retval == 0 - // Processed a message in queue successfully. This - // thread must return to thread-pool now. - return 0; - } + return retval; } - TAO_Queued_Data *q_data = 0; + // If there are no messages then we can go ahead to read from the + // handle for further reading.. + + // The buffer on the stack which will be used to hold the input + // messages + char buf [TAO_MAXBUFSIZE]; + +#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) + (void) ACE_OS::memset (buf, + '\0', + sizeof buf); +#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ + + // Create a data block + ACE_Data_Block db (sizeof (buf), + ACE_Message_Block::MB_DATA, + buf, + this->orb_core_->input_cdr_buffer_allocator (), + this->orb_core_->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + this->orb_core_->input_cdr_dblock_allocator ()); + + // Create a message block + ACE_Message_Block message_block (&db, + ACE_Message_Block::DONT_DELETE, + this->orb_core_->input_cdr_msgblock_allocator ()); - if (this->incoming_message_stack_.top (q_data) != -1 - && q_data->missing_data_ != TAO_MISSING_DATA_UNDEFINED) + + // Align the message block + ACE_CDR::mb_align (&message_block); + + size_t recv_size = 0; + + if (this->orb_core_->orb_params ()->single_read_optimization ()) { - /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */ - if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1) - { - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") - ACE_TEXT ("error consolidating incoming message\n"), - this->id ())); - } - return -1; - } + recv_size = + message_block.space (); } else { - if (this->handle_input_parse_data (rh, max_wait_time) == -1) - { - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") - ACE_TEXT ("error parsing incoming message\n"), - this->id ())); - } - return -1; - } + recv_size = + this->messaging_object ()->header_length (); } - return 0; -} - -int -TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data, - TAO_Resume_Handle &rh) -{ - // paranoid check - if (q_data->missing_data_ != 0) + // If we have a partial message, copy it into our message block + // and clear out the partial message. + if (this->partial_message_ != 0 && this->partial_message_->length () != 0) { - if (TAO_debug_level > 0) + if (message_block.copy (this->partial_message_->rd_ptr (), + this->partial_message_->length ()) == 0) { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") - ACE_TEXT ("missing data\n"), - this->id ())); + recv_size -= this->partial_message_->length (); + this->partial_message_->reset (); } - return -1; - } - - if (q_data->more_fragments_ || - q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - // consolidate message on top of stack, only for fragmented messages - TAO_Queued_Data *new_q_data = 0; - - switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) + else { - case -1: // error - return -1; + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Transport[%d]::handle_input, " + "unable to copy the partial message\n", + this->id ()), + -1); + } + } - case 0: // returning consolidated message in q_data - if (!new_q_data) - { - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") - ACE_TEXT ("error, consolidated message is NULL\n"), - this->id ())); - } - return -1; - } + // Saving the size of the received buffer in case any one needs to + // get the size of the message thats received in the + // context. Obviously the value will be changed for each recv call + // and the user is supposed to invoke the accessor only in the + // invocation context to get meaningful information. + this->recv_buffer_size_ = recv_size; + // Read the message into the message block that we have created on + // the stack. + ssize_t n = this->recv (message_block.wr_ptr (), + recv_size, + max_wait_time); - if (this->process_parsed_messages (new_q_data, rh) == -1) - { - TAO_Queued_Data::release (new_q_data); + // If there is an error return to the reactor.. + if (n <= 0) + { + return n; + } - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") - ACE_TEXT ("error processing consolidated message\n"), - this->id ())); - } - return -1; - } + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::handle_input, " + "read %d bytes\n", + this->id (), n)); + } - TAO_Queued_Data::release (new_q_data); + // Set the write pointer in the stack buffer + message_block.wr_ptr (n); - break; + // Parse the message and try consolidating the message if + // needed. + retval = this->parse_consolidate_messages (message_block, + rh, + max_wait_time); - case 1: // fragment has been stored in messaging_oject() - break; + if (retval <= 0) + { + if (retval == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::handle_input, " + "error while parsing and consolidating\n", + this->id ())); } + return retval; } - else + + if (message_block.length () > 0) { - if (this->process_parsed_messages (q_data, rh) == -1) + // Make a node of the message block.. + TAO_Queued_Data qd (&message_block, + this->orb_core_->transport_message_buffer_allocator ()); + + // Extract the data for the node.. + this->messaging_object ()->get_message_data (&qd); + + // Check whether the message was fragmented.. + if (qd.more_fragments_ || + (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) { - TAO_Queued_Data::release (q_data); + // Duplicate the node that we have as the node is on stack.. + TAO_Queued_Data *nqd = + TAO_Queued_Data::duplicate (qd); - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") - ACE_TEXT ("error processing message\n"), - this->id ())); - } - return -1; + return this->consolidate_fragments (nqd, rh); } - TAO_Queued_Data::release (q_data); - + // Process the message + return this->process_parsed_messages (&qd, + rh); } return 0; } int -TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data) +TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) { - // consolidate message on top of stack, only for fragmented messages - - // paranoid check - if (q_data->missing_data_ != 0) + // Parse the incoming message for validity. The check needs to be + // performed by the messaging objects. + switch (this->parse_incoming_messages (block)) { - return -1; - } + // An error has occurred during message parsing + case -1: + return -1; - if (q_data->more_fragments_ || - q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - TAO_Queued_Data *new_q_data = 0; + // This message block does not contain enough data to + // parse the header. We do not need to grow the partial + // message block since we are guaranteed that it can hold + // at least a GIOP header plus a GIOP fragment header. + case 1: + if (this->partial_message_ == 0) + { + this->allocate_partial_message_block (); + } - switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) + if (this->partial_message_ != 0 && + this->partial_message_->copy (block.rd_ptr (), + block.length ()) == 0) { - case -1: // error - return -1; + block.rd_ptr (block.length ()); + return 0; + } + else + { + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Transport[%d]::parse_consolidate_messages, " + "unable to save the partial message\n", + this->id ()), + -1); + } - case 0: // returning consolidated message in new_q_data - if (!new_q_data) - { - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ") - ACE_TEXT ("error, consolidated message is NULL\n"), - this->id ())); - } - return -1; - } + case 0: // The normal case + break; - if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0) - { - TAO_Queued_Data::release (new_q_data); - return -1; - } - break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Transport[%d]::parse_consolidate_messages, " + "impossible return value from parse_incoming_messages\n", + this->id ()), + -1); + } - case 1: // fragment has been stored in messaging_oject() - break; - } + // Check whether we have a complete message for processing + const ssize_t missing_data = this->missing_data (block); + + if (missing_data < 0) + { + // If we have more than one message + return this->consolidate_extra_messages (block, + rh); } - else + else if (missing_data > 0) { - if (this->incoming_message_queue_.enqueue_tail (q_data) != 0) - { - TAO_Queued_Data::release (q_data); - return -1; - } + // If we have missing data then try doing a read or try queueing + // them. + return this->consolidate_message (block, + missing_data, + rh, + max_wait_time); } - return 0; // success + return 1; } int -TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh, - ACE_Time_Value * max_wait_time, - TAO_Queued_Data *q_data) +TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) { - // paranoid check - if (q_data == 0) - { - return -1; - } + // If we have a queue and if the last message is not complete a + // complete one, then this read will get us the remaining data. So + // do not try to parse the header if we have an incomplete message + // in the queue. + if (this->incoming_message_queue_.is_tail_complete () != 0) + { + // As it looks like a new message has been read, process the + // message. Call the messaging object to do the parsing.. + int retval = + this->messaging_object ()->parse_incoming_messages (block); + + if (retval == -1 && TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, " + "error in incoming message\n", + this->id ())); + } - if (TAO_debug_level > 3) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") - ACE_TEXT ("enter (missing data == %d)\n"), - this->id (), q_data->missing_data_)); + return retval; } - const size_t recv_size = q_data->missing_data_; + return 0; +} - // make sure the message_block has enough space - const size_t message_size = recv_size - + q_data->msg_block_->length(); - if (q_data->msg_block_->space() < recv_size) +ssize_t +TAO_Transport::missing_data (ACE_Message_Block &incoming) +{ + // If we have a incomplete message in the queue then find out how + // much of data is required to get a complete message. + if (this->incoming_message_queue_.is_tail_complete () == 0) { - if (ACE_CDR::grow (q_data->msg_block_, message_size) == -1) - { - return -1; - } + return this->incoming_message_queue_.missing_data_tail (); } - // Saving the size of the received buffer in case any one needs to - // get the size of the message thats received in the - // context. Obviously the value will be changed for each recv call - // and the user is supposed to invoke the accessor only in the - // invocation context to get meaningful information. - this->recv_buffer_size_ = recv_size; - - // Read the message into the existing message block on heap - const ssize_t n = this->recv (q_data->msg_block_->wr_ptr(), - recv_size, - max_wait_time); + return this->messaging_object ()->missing_data (incoming); +} - if (n <= 0) +int +TAO_Transport::consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + // Check whether the last message in the queue is complete.. + if (this->incoming_message_queue_.is_tail_complete () == 0) { - return n; + return this->consolidate_message_queue (incoming, + missing_data, + rh, + max_wait_time); } - if (TAO_debug_level > 3) + if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") - ACE_TEXT ("read bytes %d\n"), - this->id (), n)); + "TAO (%P|%t) - Transport[%d]::consolidate_message\n", + this->id ())); } - q_data->msg_block_->wr_ptr(n); - q_data->missing_data_ -= n; + // Calculate the actual length of the load that we are supposed to + // read which is equal to the <missing_data> + length of the buffer + // that we have.. + const size_t payload = missing_data + incoming.size (); + + // Grow the buffer to the size of the message + ACE_CDR::grow (&incoming, + payload); - if (q_data->missing_data_ == 0) + ssize_t n = 0; + + // As this used for transports where things are available in one + // shot this looping should not create any problems. + for (ssize_t bytes = missing_data; bytes != 0; bytes -= n) { - // paranoid check - if (this->incoming_message_stack_.pop (q_data) == -1) + // .. do a read on the socket again. + n = this->recv (incoming.wr_ptr (), + bytes, + max_wait_time); + + if (TAO_debug_level > 6) { - return -1; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "read %d bytes on attempt\n", + this->id(), n)); } - if (this->consolidate_process_message (q_data, rh) == -1) + if (n == 0 || n == -1) { - return -1; + break; } - } - - return 0; -} - -int -TAO_Transport::handle_input_parse_extra_messages (ACE_Message_Block &message_block) -{ - - // store buffer status of last extraction: -1 parse error, 0 - // incomplete message header in buffer, 1 complete messages header - // parsed - int buf_status = 0; - - TAO_Queued_Data *q_data = 0; // init + incoming.wr_ptr (n); + missing_data -= n; + } - // parse buffer until all messages have been extracted, consolidate - // and enqueue complete messages, if the last message being parsed - // has missin data, it is stays on top of incoming_message_stack. - while (message_block.length () > 0 && - (buf_status = this->messaging_object ()->extract_next_message - (message_block, q_data)) != -1 && - q_data != 0) // paranoid check + // If we got an error.. + if (n == -1) { - if (q_data->missing_data_ == 0) - { - if (this->consolidate_enqueue_message (q_data) == -1) - { - return -1; - } - } - else // incomplete message read, probably the last message in buffer + if (TAO_debug_level > 4) { - // can not fail - this->incoming_message_stack_.push (q_data); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "error while trying to consolidate\n", + this->id ())); } - q_data = 0; // reset - } // while - - if (buf_status == -1) - { return -1; } - return 0; -} - -int -TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh, - ACE_Time_Value * max_wait_time) -{ + // If we had gotten a EWOULDBLOCK n would be equal to zero. But we + // have to put the message in the queue anyway. So let us proceed + // to do that and return... - if (TAO_debug_level > 3) + // Check to see if we have messages in queue or if we have missing + // data . AT this point we cannot have have semi-complete messages + // in the queue as they would have been taken care before. Put + // ourselves in the queue and then try processing one of the + // messages.. + if (missing_data >= 0 || + this->incoming_message_queue_.queue_length () != 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") - ACE_TEXT ("enter\n"), - this->id ())); - } + if (missing_data == 0 || + !this->incoming_message_queue_.is_tail_fragmented ()) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "queueing up the message\n", + this->id ())); + } + // Get a queued data + TAO_Queued_Data *qd = + this->make_queued_data (incoming); - // The buffer on the stack which will be used to hold the input - // messages, ACE_CDR::MAX_ALIGNMENT compensates the - // memory-alignment. This improves performance with SUN-Java-ORB-1.4 - // and higher that sends fragmented requests of size 1024 bytes. - char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT]; + // Add the missing data to the queue + qd->missing_data_ = missing_data; -#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) - (void) ACE_OS::memset (buf, - '\0', - sizeof buf); -#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ - - // Create a data block - ACE_Data_Block db (sizeof (buf), - ACE_Message_Block::MB_DATA, - buf, - this->orb_core_->input_cdr_buffer_allocator (), - this->orb_core_->locking_strategy (), - ACE_Message_Block::DONT_DELETE, - this->orb_core_->input_cdr_dblock_allocator ()); - - // Create a message block - ACE_Message_Block message_block (&db, - ACE_Message_Block::DONT_DELETE, - this->orb_core_->input_cdr_msgblock_allocator ()); + // Get the rest of the messaging data + this->messaging_object ()->get_message_data (qd); + // If this is a full GIOP fragment, then we need only + // to consolidate the fragments + if (missing_data == 0 && + (qd->more_fragments_ || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + this->consolidate_fragments (qd, rh); + } + else + { + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); - // Align the message block - ACE_CDR::mb_align (&message_block); + if (this->incoming_message_queue_.is_head_complete ()) + { + return this->process_queue_head (rh); + } + } + } + else + { + // This block of code will only come into play when GIOP + // message fragmentation is employed. If we have a fragment + // in the message queue, we can only chain message blocks + // onto the TAO_Queued_Data for that fragment. Unless we have + // a full GIOP fragment, and since we know we're missing data, + // we need to save what we have until we can read in some more of + // the fragment until we get it all. This bit of data could be + // larger than what the partial message block can hold, so we may + // need to grow the partial message block. + if (this->partial_message_ == 0) + { + this->allocate_partial_message_block (); + } - size_t recv_size = 0; // Note: unsigned integer + if (this->partial_message_ != 0) + { + const size_t incoming_length = incoming.length (); + ACE_CDR::grow (this->partial_message_, + incoming_length); + if (this->partial_message_->copy (incoming.rd_ptr (), + incoming_length) == 0) + { + incoming.rd_ptr (incoming_length); + } + else + { + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "unable to save the partial message\n", + this->id ()), + -1); + } + } + else + { + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "unable to allocate the partial message\n", + this->id ()), + -1); + } + } - // Pointer to newly parsed message - TAO_Queued_Data *q_data = 0; + return 0; + } - // optimizing access of constants - const size_t header_length = - this->messaging_object ()->header_length (); + // We don't have any missing data. Just make a queued_data node with + // the existing message block and send it to the higher layers of + // the ORB. + TAO_Queued_Data pqd (&incoming, + this->orb_core_->transport_message_buffer_allocator ()); + pqd.missing_data_ = missing_data; + this->messaging_object ()->get_message_data (&pqd); - // paranoid check - if (header_length > message_block.space ()) + // Check whether the message was fragmented and try to consolidate + // the fragments.. + if (pqd.more_fragments_ || + (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) { - return -1; - } + // Duplicate the queued data as it is on stack.. + TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd); - if (this->orb_core_->orb_params ()->single_read_optimization ()) - { - recv_size = - message_block.space (); + return this->consolidate_fragments (nqd, rh); } - else + + // Now we have a full message in our buffer. Just go ahead and + // process that + return this->process_parsed_messages (&pqd, + rh); +} + +int +TAO_Transport::consolidate_fragments (TAO_Queued_Data *queueable_message, + TAO_Resume_Handle &rh) +{ + // Get the version numbers + CORBA::Octet major = queueable_message->major_version_; + CORBA::Octet minor = queueable_message->minor_version_; + CORBA::UShort whole = major << 8 | minor; + + switch(whole) { - // Single read optimization has been de-activated. That means - // that we need to read from transport the GIOP header first - // before the payload. This codes first checks the incoming - // stack for partial messages which needs to be - // consolidated. Otherwise we are in new cycle, reading complete - // GIOP header of new incoming message. - if (this->incoming_message_stack_.top (q_data) != -1 - && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED) + case 0x0100: + if (!queueable_message->more_fragments_) { - // There is a partial message on incoming_message_stack_ - // whose length is unknown so far. We need to consolidate - // the GIOP header to get to know the payload size, - recv_size = header_length - q_data->msg_block_->length (); + this->incoming_message_queue_.enqueue_tail (queueable_message); } else { - // Read amount of data forming GIOP header of new incoming - // message. - recv_size = header_length; + // Fragments aren't supported in 1.0. This is an error and + // we should reject it somehow. What do we do here? Do we throw + // an exception to the receiving side? Do we throw an exception + // to the sending side? + // + // At the very least, we need to log the fact that we received + // nonsense. + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("TAO (%P|%t) - ") + ACE_TEXT("TAO_Transport::enqueue_incoming_message ") + ACE_TEXT("detected a fragmented GIOP 1.0 message\n")), + -1); } - // POST: 0 <= recv_size <= header_length + break; + case 0x0101: + { + // One note is that TAO_Queued_Data contains version numbers, + // but doesn't indicate the actual protocol to which those + // version numbers refer. That's not a problem, though, because + // instances of TAO_Queued_Data live in a queue, and that queue + // lives in a particular instance of a Transport, and the + // transport instance has an association with a particular + // messaging_object. The concrete messaging object embodies a + // messaging protocol, and must cover all versions of that + // protocol. Therefore, we just need to cover the bases of all + // versions of that one protocol. + + // In 1.1, fragments kinda suck because they don't have they're + // own message-specific header. Therefore, we have to find the + // fragment based on the major and minor version. + TAO_Queued_Data* fragment_message_chain = + this->incoming_message_queue_.find_fragment_chain (major, minor); + + // Deal with the fragment and the queueable message + this->process_fragment (fragment_message_chain, + queueable_message, + major, minor, rh); + break; + } + case 0x0102: + { + // In 1.2, we get a little more context. There's a + // FRAGMENT message-specific header, and inside that is the + // request id with which the fragment is associated. + TAO_Queued_Data* fragment_message_chain = + this->incoming_message_queue_.find_fragment_chain ( + queueable_message->request_id_); + + // Deal with the fragment and the queueable message + this->process_fragment (fragment_message_chain, + queueable_message, + major, minor, rh); + break; + } + default: + ACE_ERROR ((LM_ERROR, + ACE_TEXT("TAO (%P|%t) - ") + ACE_TEXT("TAO_Transport::consolidate_fragments ") + ACE_TEXT("can not handle a GIOP %d.%d ") + ACE_TEXT("message\n"), major, minor)); + ACE_HEX_DUMP ((LM_DEBUG, + queueable_message->msg_block_->rd_ptr (), + queueable_message->msg_block_->length ())); + return -1; } - // POST: 0 <= recv_size <= message_block->space () - // If we have a partial message, copy it into our message block and - // clear out the partial message. - if (this->partial_message_ != 0 && this->partial_message_->length () > 0) + return 0; +} + +void +TAO_Transport::process_fragment (TAO_Queued_Data* fragment_message_chain, + TAO_Queued_Data* queueable_message, + CORBA::Octet major, + CORBA::Octet minor, + TAO_Resume_Handle &rh) +{ + // No fragment was found + if (fragment_message_chain == 0) { - // (*) Copy back the partial message into current read-buffer, - // verify that the read-strategy of "recv_size" bytes is not - // exceeded. The latter check guarantees that recv_size does not - // roll-over and keeps in range - // 0<=recv_size<=message_block->space() - if (this->partial_message_->length () <= recv_size && - message_block.copy (this->partial_message_->rd_ptr (), - this->partial_message_->length ()) == 0) + this->incoming_message_queue_.enqueue_tail (queueable_message); + } + else + { + if (fragment_message_chain->major_version_ != major || + fragment_message_chain->minor_version_ != minor) + ACE_ERROR ((LM_ERROR, + ACE_TEXT("TAO (%P|%t) - ") + ACE_TEXT("TAO_Transport::process_fragment ") + ACE_TEXT("GIOP versions do not match ") + ACE_TEXT("(%d.%d != %d.%d\n"), + fragment_message_chain->major_version_, + fragment_message_chain->minor_version_, + major, minor)); + + // Find the last message block in the continuation + ACE_Message_Block* mb = fragment_message_chain->msg_block_; + while (mb->cont () != 0) + mb = mb->cont (); + + // Add the current message block to the end of the chain + // after adjusting the read pointer to skip the header(s) + const size_t header_adjustment = + this->messaging_object ()->header_length () + + this->messaging_object ()->fragment_header_length (major, minor); + queueable_message->msg_block_->rd_ptr(header_adjustment); + mb->cont (queueable_message->msg_block_); + + // Remove our reference to the message block. At this point + // the message block of the fragment head owns it as part of a + // chain + queueable_message->msg_block_ = 0; + + if (!queueable_message->more_fragments_) { + // This is the end of the fragments for this request + fragment_message_chain->consolidate (); - recv_size -= this->partial_message_->length (); - this->partial_message_->reset (); - } - else - { - return -1; + // Process the queue head to make sure that the newly + // consolidated fragments get handled + this->process_queue_head (rh); } + + // Get rid of the queuable message + TAO_Queued_Data::release (queueable_message); } - // POST: 0 <= recv_size <= buffer_space +} - if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0 +int +TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + if (TAO_debug_level > 4) { - // This event would cause endless looping, trying frequently to - // read zero bytes from stream. This might happen, if TAOs - // protocol implementation is not correct and tries to read data - // beyond header without "single_read_optimazation" being - // activated. - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") - ACE_TEXT ("Error - endless loop detection, closing connection"), - this->id ())); - } - return -1; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n", + this->id ())); } - // Saving the size of the received buffer in case any one needs to - // get the size of the message thats received in the - // context. Obviously the value will be changed for each recv call - // and the user is supposed to invoke the accessor only in the - // invocation context to get meaningful information. - this->recv_buffer_size_ = recv_size; - - // Read the message into the message block that we have created on - // the stack. - const ssize_t n = this->recv (message_block.wr_ptr (), - recv_size, - max_wait_time); + // If the queue did not have a complete message put this piece of + // message in the queue. We know it did not have a complete + // message. That is why we are here. + const size_t n = + this->incoming_message_queue_.copy_tail (incoming); - // If there is an error return to the reactor.. - if (n <= 0) + if (TAO_debug_level > 6) { - return n; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "copied [%d] bytes to the tail\n", + this->id (), + n)); } - if (TAO_debug_level > 3) + // Update the missing data... + missing_data = + this->incoming_message_queue_.missing_data_tail (); + + if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") - ACE_TEXT ("read %d bytes\n"), - this->id (), n)); + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "missing [%d] bytes in the tail message\n", + this->id (), + missing_data)); } - // Set the write pointer in the stack buffer - message_block.wr_ptr (n); - - // - // STACK PROCESSING OR MESSAGE CONSOLIDATION - // - - // PRE: data in buffer is aligned && message_block.length() > 0 + // Move the read pointer of the <incoming> message block to the end + // of the copied message and process the remaining portion... + incoming.rd_ptr (n); - if (this->incoming_message_stack_.top (q_data) != -1 - && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED) + // If we have some more information left in the message block.. + if (incoming.length ()) { - // - // MESSAGE CONSOLIDATION - // + // We may have to parse & consolidate. This part of the message + // doesn't seem to be part of the last message in the queue (as + // the copy () hasn't taken away this message). + const int retval = this->parse_consolidate_messages (incoming, + rh, + max_wait_time); - // Partial message on incoming_message_stack_ needs to be - // consolidated. The message header could not be parsed so far - // and therefor the message size is unknown yet. Consolidating - // the message destroys the memory alignment of succeeding - // messages sharing the buffer, for that reason consolidation - // and stack based processing are mutial exclusive. - if (this->messaging_object ()->consolidate_node (q_data, - message_block) == -1) + // If there is an error return + if (retval == -1) { - if (TAO_debug_level > 0) + if (TAO_debug_level) { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") - ACE_TEXT ("error consolidating message from input buffer\n"), - this->id () )); - } - return -1; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "error while consolidating, part of the read message\n", + this->id ())); + } + return retval; } - - // Complete message are to be enqueued and later processed - if (q_data->missing_data_ == 0) + else if (retval == 1) { - if (this->incoming_message_stack_.pop (q_data) == -1) - { - return -1; - } + // If the message in the <incoming> message block has only + // one message left we need to process that seperately. + + // Get a queued data + TAO_Queued_Data *qd = this->make_queued_data (incoming); + + // Get the rest of the message data + this->messaging_object ()->get_message_data (qd); - if (this->consolidate_enqueue_message (q_data) == -1) + // Add the missing data to the queue + qd->missing_data_ = 0; + + // Check whether the message was fragmented and try to consolidate + // the fragments.. + if (qd->more_fragments_ + || (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) { - return -1; + return this->consolidate_fragments (qd, rh); } - } - if (message_block.length () > 0 - && this->handle_input_parse_extra_messages (message_block) == -1) - { - return -1; - } + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); - // In any case try to process the enqueued messages - if (this->process_queue_head (rh) == -1) - { - return -1; + // We should surely have a message in queue now. So just + // process that. + return this->process_queue_head (rh); } - } - else - { - // - // STACK PROCESSING (critical path) - // - // Process the first message in buffer on stack + // parse_consolidate_messages () would have processed one of the + // messages, so we better return as we dont want to starve other + // threads. + return 0; + } - // (PRE: first message resides in aligned memory) Make a node of - // the message-block.. + // If we still have some missing data.. + if (missing_data > 0) + { + // Get the last message from the Queue + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_tail (); - TAO_Queued_Data qd (&message_block, - this->orb_core_->transport_message_buffer_allocator ()); + if (TAO_debug_level > 5) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "trying recv, again\n", + this->id ())); + } - size_t mesg_length = 0; + // Try to do a read again. If we have some luck it would be + // great.. + const ssize_t n = this->recv (qd->msg_block_->wr_ptr (), + missing_data, + max_wait_time); - if (this->messaging_object ()->parse_next_message (message_block, - qd, - mesg_length) == -1 - || (qd.missing_data_ == 0 - && mesg_length > message_block.length ()) ) + if (TAO_debug_level > 5) { - // extracting message failed - return -1; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "recv retval [%d]\n", + this->id (), + n)); } - // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length() - // This prevents seeking rd_ptr behind the wr_ptr - if (qd.missing_data_ != 0 || - qd.more_fragments_ || - qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + // Error... + if (n < 0) { - if (qd.missing_data_ == 0) - { - // Dealing with a fragment - TAO_Queued_Data *nqd = - TAO_Queued_Data::duplicate (qd); - - if (nqd == 0) - { - return -1; - } + return n; + } - // mark the end of message in new buffer - char* end_mark = nqd->msg_block_->rd_ptr () - + mesg_length; - nqd->msg_block_->wr_ptr (end_mark); + // If we get a EWOULDBLOCK ie. n==0, we should anyway put the + // message in queue before returning.. + // Move the write pointer + qd->msg_block_->wr_ptr (n); - // move the read pointer forward in old buffer - message_block.rd_ptr (mesg_length); + // Decrement the missing data + qd->missing_data_ -= n; - // enqueue the message - if (this->consolidate_enqueue_message (nqd) == -1) - { - return -1; - } + // Now put the TAO_Queued_Data back in the queue + this->incoming_message_queue_.enqueue_tail (qd); - if (message_block.length () > 0 - && this->handle_input_parse_extra_messages (message_block) == -1) - { - return -1; - } + // Any way as we have come this far and are about to return, + // just try to process a message if it is there in the queue. + if (this->incoming_message_queue_.is_head_complete ()) + { + return this->process_queue_head (rh); + } - // In any case try to process the enqueued messages - if (this->process_queue_head (rh) == -1) - { - return -1; - } - } - else if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED) - { - // Incomplete message, must be the last one in buffer + return 0; + } - if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED && - qd.missing_data_ > message_block.space ()) - { - // Re-Allocate correct size on heap - if (ACE_CDR::grow (qd.msg_block_, - message_block.length () - + qd.missing_data_) == -1) - { - return -1; - } - } + // Process a message in the head of the queue if we have one.. + return this->process_queue_head (rh); +} - TAO_Queued_Data *nqd = - TAO_Queued_Data::duplicate (qd); - if (nqd == 0) - { - return -1; - } +int +TAO_Transport::consolidate_extra_messages (ACE_Message_Block + &incoming, + TAO_Resume_Handle &rh) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n", + this->id ())); + } - // move read-pointer to end of buffer - message_block.rd_ptr (message_block.length()); + // Pick the tail of the queue + TAO_Queued_Data *tail = + this->incoming_message_queue_.dequeue_tail (); - this->incoming_message_stack_.push (nqd); - } - } - else + if (tail) + { + // If we have a node in the tail, checek to see whether it needs + // consolidation. If so, just consolidate it. + if (this->messaging_object ()->consolidate_node (tail, incoming) == -1) { - // - // critical path - // - - // We cant process the message on stack right now. First we - // have got to parse extra messages from message_block, - // putting them into queue. When this is done we can return - // to process this message, and notifying other threads to - // process the messages in queue. - - char * end_marker = message_block.rd_ptr () - + mesg_length; - - if (message_block.length () > mesg_length) - { - // There are more message in data stream to be parsed. - // Safe the rd_ptr to restore later. - char *rd_ptr_stack_mesg = message_block.rd_ptr (); - - // Skip parsed message, jump to next message in buffer - // PRE: mesg_length <= message_block.length () - message_block.rd_ptr (mesg_length); + return -1; + } - // Extract remaining messages and enqueue them for later - // heap processing - if (this->handle_input_parse_extra_messages (message_block) == -1) - { - return -1; - } + // .. put the tail back in queue.. + this->incoming_message_queue_.enqueue_tail (tail); + } - // correct the end_marker - end_marker = message_block.rd_ptr (); + int retval = 1; - // Restore rd_ptr - message_block.rd_ptr (rd_ptr_stack_mesg); - } + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, " + "extracting extra messages\n", + this->id ())); + } - // The following if-else has been copied from - // process_queue_head(). While process_queue_head() - // processes message on heap, here we will process a message - // on stack. + // Extract messages.. + while (retval == 1) + { + TAO_Queued_Data *q_data = 0; - // Now that we have one message on stack to be processed, - // check whether we have one more message in the queue... - if (this->incoming_message_queue_.queue_length () > 0) + retval = + this->messaging_object ()->extract_next_message (incoming, + q_data); + if (q_data) + { + // If we have read a framented message then... + if (q_data->more_fragments_ || + q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") - ACE_TEXT ("notify reactor\n"), - this->id ())); - - } - - const int retval = this->notify_reactor (); - - if (retval == 1) - { - // Let the class know that it doesn't need to resume the - // handle.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); - } - else if (retval < 0) - return -1; + this->consolidate_fragments (q_data, rh); } else { - // As there are no further messages in queue just resume - // the handle. Set the flag incase someone had reset the flag.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); - } - - // PRE: incoming_message_queue is empty - if (this->process_parsed_messages (&qd, - rh) == -1) - { - return -1; + this->incoming_message_queue_.enqueue_tail (q_data); } - - // move the rd_ptr tp position of end_marker - message_block.rd_ptr (end_marker); } } - // Now that all cases have been processed, there might be kept some data - // in buffer that needs to be safed for next "handle_input" invocations. - if (message_block.length () > 0) - { - if (this->partial_message_ == 0) - { - this->allocate_partial_message_block (); - } - - if (this->partial_message_ != 0 && - this->partial_message_->copy (message_block.rd_ptr (), - message_block.length ()) == 0) - { - message_block.rd_ptr (message_block.length ()); - } - else - { - return -1; - } - } - - return 0; -} + // In case of error return.. + if (retval == -1) + { + return retval; + } + return this->process_queue_head (rh); +} int TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) { - if (TAO_debug_level > 7) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") - ACE_TEXT ("entering (missing data == %d)\n"), - this->id(), qd->missing_data_)); - } - // Get the <message_type> that we have received const TAO_Pluggable_Message_Type t = qd->msg_type_; @@ -2077,9 +2153,9 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") - ACE_TEXT ("received CloseConnection message - %m\n"), - this->id())); + "TAO (%P|%t) - Transport[%d]::process_parsed_messages, " + "received CloseConnection message - %m\n", + this->id())); // Return a "-1" so that the next stage can take care of // closing connection and the necessary memory management. @@ -2113,46 +2189,22 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") - ACE_TEXT ("error in process_reply_message - %m\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::process_parsed_messages, " + "error in process_reply_message - %m\n", + this->id ())); return -1; } } - else if (t == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST) - { - // The associated request might be incomplpete residing - // fragmented in messaging object. We must make sure the - // resources allocated by fragments are released. - - if (this->messaging_object ()->discard_fragmented_message (qd) == -1) - { - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") - ACE_TEXT ("error processing CancelRequest\n"), - this->id ())); - } - } - - // We are not able to cancel requests being processed already; - // this is declared as optional feature by CORBA, and TAO does - // not support this currently. - - // Just continue processing, CancelRequest does not mean to cut - // off the connection. - } else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) { - if (TAO_debug_level > 0) + if (TAO_debug_level) { ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") - ACE_TEXT ("received MessageError, closing connection\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::process_parsed_messages, " + "received MessageError, closing connection\n", + this->id ())); } return -1; } @@ -2161,18 +2213,68 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, return 0; } +TAO_Queued_Data * +TAO_Transport::make_queued_data (ACE_Message_Block &incoming) +{ + // Get an instance of TAO_Queued_Data + TAO_Queued_Data *qd = + TAO_Queued_Data::make_queued_data ( + this->orb_core_->transport_message_buffer_allocator ()); + + // Get the flag for the details of the data block... + ACE_Message_Block::Message_Flags flg = + incoming.self_flags (); + + if (ACE_BIT_DISABLED (flg, + ACE_Message_Block::DONT_DELETE)) + { + // Duplicate the data block before putting it in the queue. + qd->msg_block_ = ACE_Message_Block::duplicate (&incoming); + } + else + { + // As we are in CORBA mode, all the data blocks would be aligned + // on an 8 byte boundary. Hence create a data block for more + // than the actual length + ACE_Data_Block *db = + this->orb_core_->create_input_cdr_data_block (incoming.length ()+ + ACE_CDR::MAX_ALIGNMENT); + + // Get the allocator.. + ACE_Allocator *alloc = + this->orb_core_->input_cdr_msgblock_allocator (); + + // Make message block.. + ACE_Message_Block mb (db, + 0, + alloc); + + // Duplicate the block.. + qd->msg_block_ = mb.duplicate (); + + // Align the message block + ACE_CDR::mb_align (qd->msg_block_); + + // Copy the data.. + qd->msg_block_->copy (incoming.rd_ptr (), + incoming.length ()); + } + + return qd; +} + int TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) { if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"), - this->id (), this->incoming_message_queue_.queue_length () )); + "TAO (%P|%t) - Transport[%d]::process_queue_head\n", + this->id ())); } - // See if message in queue ... - if (this->incoming_message_queue_.queue_length () > 0) + // See if the message in the head of the queue is complete... + if (this->incoming_message_queue_.is_head_complete () > 0) { // Get the message on the head of the queue.. TAO_Queued_Data *qd = @@ -2181,21 +2283,21 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") - ACE_TEXT ("the size of the queue is [%d]\n"), - this->id (), - this->incoming_message_queue_.queue_length())); + "TAO (%P|%t) - Transport[%d]::process_queue_head, " + "the size of the queue is [%d]\n", + this->id (), + this->incoming_message_queue_.queue_length())); } // Now that we have pulled out out one message out of the queue, // check whether we have one more message in the queue... - if (this->incoming_message_queue_.queue_length () > 0) + if (this->incoming_message_queue_.is_head_complete () > 0) { if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") - ACE_TEXT ("notify reactor\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::process_queue_head, " + "notify reactor\n", + this->id ())); } @@ -2248,9 +2350,9 @@ TAO_Transport::notify_reactor (void) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") - ACE_TEXT ("notify to Reactor\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::notify_reactor, " + "notify to Reactor\n", + this->id ())); } @@ -2263,9 +2365,9 @@ TAO_Transport::notify_reactor (void) // @@todo: need to think about what is the action that // we can take when we get here. ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") - ACE_TEXT ("notify to the reactor failed..\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::notify_reactor, " + "notify to the reactor failed..\n", + this->id ())); } return 1; @@ -2366,10 +2468,10 @@ TAO_Transport::post_open (size_t id) if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ") - ACE_TEXT ("could not register the transport ") - ACE_TEXT ("in the reactor.\n"), - this->id ())); + "TAO (%P|%t) - Transport[%d]::post_connect , " + "could not register the transport " + "in the reactor.\n", + this->id ())); return false; } @@ -2384,11 +2486,7 @@ TAO_Transport::allocate_partial_message_block (void) { // This value must be at least large enough to hold a GIOP message // header plus a GIOP fragment header - const size_t partial_message_size = - this->messaging_object ()->header_length (); - // + this->messaging_object ()->fragment_header_length (); - // deprecated, conflicts with not-single_read_opt. - + const size_t partial_message_size = 16; ACE_NEW (this->partial_message_, ACE_Message_Block (partial_message_size)); } @@ -2400,5 +2498,3 @@ TAO_Transport::allocate_partial_message_block (void) */ //@@ TAO_TRANSPORT_SPL_METHODS_ADD_HOOK - -TAO_END_VERSIONED_NAMESPACE_DECL |