diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 1208 |
1 files changed, 604 insertions, 604 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 1640644edf9..9923ff1f895 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -19,9 +19,7 @@ #include "Resume_Handle.h" #include "Codeset_Manager.h" #include "Codeset_Translator_Factory.h" -#include "GIOP_Message_State.h" #include "ace/OS_NS_sys_time.h" -#include "ace/Message_Block.h" #include "ace/Reactor.h" @@ -112,15 +110,12 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , head_ (0) , tail_ (0) , incoming_message_queue_ (orb_core) - , uncompleted_message_ (0) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) , id_ ((size_t) this) , purging_order_ (0) - , recv_buffer_size_ (0) - , sent_byte_count_ (0) , char_translator_ (0) , wchar_translator_ (0) , tcs_set_ (0) @@ -241,9 +236,13 @@ TAO_Transport::generate_request_header ( { // codeset service context is only supposed to be sent in the first request // on a particular connection. - TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager(); - if (csm && this->first_request_) - csm->generate_service_context( opdetails, *this ); + if (this->first_request_) + { + this->orb_core ()->codeset_manager ()->generate_service_context ( + opdetails, + *this + ); + } if (this->messaging_object ()->generate_request_header (opdetails, spec, @@ -605,12 +604,7 @@ int TAO_Transport::schedule_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - ACE_Reactor *reactor = eh->reactor (); - if (reactor == 0) - return -1; if (TAO_debug_level > 3) { @@ -626,12 +620,7 @@ int TAO_Transport::cancel_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - ACE_Reactor *reactor = eh->reactor (); - if (reactor == 0) - return -1; if (TAO_debug_level > 3) { @@ -750,9 +739,6 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) // no bytes are sent send() can only return 0 or -1 ACE_ASSERT (byte_count != 0); - // Total no. of bytes sent for a send call - this->sent_byte_count_ += byte_count; - if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, @@ -776,10 +762,6 @@ TAO_Transport::drain_queue_i (void) // We loop over all the elements in the queue ... TAO_Queued_Message *i = this->head_; - // reset the value so that the counting is done for each new send - // call. - this->sent_byte_count_ = 0; - while (i != 0) { // ... each element fills the iovector ... @@ -838,14 +820,8 @@ TAO_Transport::drain_queue_i (void) if (this->flush_timer_pending ()) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh != 0) - { - ACE_Reactor *reactor = eh->reactor (); - if (reactor != 0) - { - (void) reactor->cancel_timer (this->flush_timer_id_); - } - } + ACE_Reactor *reactor = eh->reactor (); + reactor->cancel_timer (this->flush_timer_id_); this->reset_flush_timer (); } @@ -922,25 +898,20 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, if (set_timer) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh != 0) - { - ACE_Reactor *reactor = eh->reactor (); - if (reactor != 0) - { - this->current_deadline_ = new_deadline; - ACE_Time_Value delay = - new_deadline - ACE_OS::gettimeofday (); + ACE_Reactor *reactor = eh->reactor (); + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); - if (this->flush_timer_pending ()) - { - (void) reactor->cancel_timer (this->flush_timer_id_); - } - this->flush_timer_id_ = - reactor->schedule_timer (&this->transport_timer_, - &this->current_deadline_, - delay); - } + if (this->flush_timer_pending ()) + { + reactor->cancel_timer (this->flush_timer_id_); } + + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); } return constraints_reached; @@ -1147,18 +1118,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub, return 0; } - -class CTHack -{ -public: - CTHack() { enter(); } - ~CTHack() { leave(); } -private: - void enter() { x = 1; } - void leave() { x = 0; } - int x; -}; - /* * * All the methods relevant to the incoming data path of the ORB are @@ -1170,8 +1129,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_Time_Value * max_wait_time, int /*block*/) { - CTHack cthack; - if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, @@ -1179,8 +1136,9 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, this->id ())); } - // First try to process messages off the head of the incoming queue. + // First try to process messages of the head of the incoming queue. int retval = this->process_queue_head (rh); + if (retval <= 0) { if (retval == -1) @@ -1191,6 +1149,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, "error while parsing the head of the queue\n", this->id())); } + return retval; } @@ -1199,7 +1158,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // The buffer on the stack which will be used to hold the input // messages - char buf[TAO_MAXBUFSIZE]; + char buf [TAO_MAXBUFSIZE]; #if defined (ACE_HAS_PURIFY) (void) ACE_OS::memset (buf, @@ -1221,35 +1180,26 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_Message_Block::DONT_DELETE, this->orb_core_->input_cdr_msgblock_allocator ()); - // We'll loop trying to complete the message this number of times, - // and that's it. - unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS; - - unsigned int did_queue_message = 0; // Align the message block ACE_CDR::mb_align (&message_block); size_t recv_size = 0; + if (this->orb_core_->orb_params ()->single_read_optimization ()) { - recv_size = message_block.space (); + recv_size = + message_block.space (); } else { - recv_size = this->messaging_object ()->header_length (); + recv_size = + this->messaging_object ()->header_length (); } - // Saving the size of the received buffer in case any one needs to - // get the size of the message thats received in the - // context. Obviously the value will be changed for each recv call - // and the user is supposed to invoke the accessor only in the - // invocation context to get meaningful information. - this->recv_buffer_size_ = recv_size; - // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.wr_ptr (), + ssize_t n = this->recv (message_block.rd_ptr (), recv_size, max_wait_time); @@ -1262,7 +1212,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i: " + "TAO (%P|%t) - Transport[%d]::handle_input_i, " "read %d bytes\n", this->id (), n)); } @@ -1270,372 +1220,172 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // Set the write pointer in the stack buffer message_block.wr_ptr (n); - if (TAO_debug_level >= 10) - ACE_HEX_DUMP ((LM_DEBUG, - (const char *) message_block.rd_ptr (), - message_block.length (), - ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket"))); - + // Parse the message and try consolidating the message if + // needed. + retval = this->parse_consolidate_messages (message_block, + rh, + max_wait_time); -complete_message_and_possibly_enqueue: - // Check to see if we're still working to complete a message - if (this->uncompleted_message_) + if (retval <= 0) { - // try to complete it + if (retval == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "error while parsing and consolidating\n", + this->id ())); + } + return retval; + } - // on exit from this frame we have one of the following states: - // - // (a) an uncompleted message still in uncompleted_message_ - // AND message_block is empty - // - // (b) uncompleted_message_ zero, the completed message at the - // tail of the incoming queue; message_block could be empty - // or still contain bytes + // Make a node of the message block.. + TAO_Queued_Data qd (&message_block, + this->orb_core_->transport_message_buffer_allocator ()); - // ==> repeat - do - { - /* - * Append the "right number of bytes" to uncompleted_message_ - */ - // ==> right_number_of_bytes = MIN(bytes missing from - // uncompleted_message_, length of message_block); - size_t right_number_of_bytes = - ACE_MIN (this->uncompleted_message_->missing_data_bytes_, - message_block.length () ); + // Extract the data for the node.. + this->messaging_object ()->get_message_data (&qd); - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "trying to use %u (of %u) " - "bytes to complete message missing %u bytes\n", - this->id (), - right_number_of_bytes, - message_block.length (), - this->uncompleted_message_->missing_data_bytes_)); - } + // Check whether the message was fragmented.. + if (qd.more_fragments_ || + (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + // Duplicate the node that we have as the node is on stack.. + TAO_Queued_Data *nqd = + TAO_Queued_Data::duplicate (qd); - // ==> append right_number_of_bytes from message_block - // to uncomplete_message_ & update read pointer of - // message_block; - - // 1. we assume that uncompleted_message_.msg_block_'s - // wr_ptr is properly maintained - // 2. we presume that uncompleted_message_.msg_block was - // allocated with enough space to contain the *entire* - // expected GIOP message, so this copy shouldn't involve an - // additional allocation - this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (), - right_number_of_bytes); - this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; - message_block.rd_ptr (right_number_of_bytes); - - switch (this->uncompleted_message_->current_state_) - { - case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER: - { - int hdrvalidity = this->messaging_object()->check_for_valid_header ( - *this->uncompleted_message_->msg_block_); - if (hdrvalidity == 0) - { - // According to the spec, Section 15.4.8, we should send - // the MessageError GIOP message on receipt of "any message...whose - // header is not properly formed (e.g., has the wrong magic value)". - // - // So, rather than returning -1, what we REALLY need to do is - // send a MessageError in reply. - // - // I'm not sure what the best way to trigger that is...probably to - // queue up a special internal-only COMPLETED message that, when - // processed, sends the MessageError as part of its processing. - return -1; - } - else if (hdrvalidity == 1) - { - // ==> update bytes missing from uncompleted_message_ - // with size of message from valid header; - this->messaging_object()->set_queued_data_from_message_header ( - this->uncompleted_message_, - *this->uncompleted_message_->msg_block_); - // ==> change state of uncompleted_event_ to - // WAITING_TO_COMPLETE_PAYLOAD; - this->uncompleted_message_->current_state_ = - TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD; - - // ==> Resize the message block to have capacity for - // the rest of the incoming message - ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_; - ACE_CDR::grow (&mb, - mb.size () - + this->uncompleted_message_->missing_data_bytes_); - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "found a valid header in the message; " - "waiting for %u bytes to complete payload\n", - this->id (), - this->uncompleted_message_->missing_data_bytes_)); - } - - // Continue the loop... - continue; - } - // In the case where we don't have enough information (hdrvalidity == -1), - // we just have to fall through and collect more. -#if 0 - else - { - // What the heck will we do with a bad header? Just - // better to close the connection and let things - // re-train from there. - if (this->uncompleted_message_->msg_block_->length () == - this->messaging_object()->header_length()) - return -1; - -#if 0 // I don't think I need this clause, but I'm leaving it just in case. - // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes; - this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; - ACE_ASSERT (this->uncompleted_message_->missing_data_bytes_ > 0); -#endif - } -#endif - } - break; - - case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD: - // Here we have an opportunity to try to finish reading the - // uncompleted message. This is a Good Idea(TM) because there are - // good odds that either more data came available since the last - // time we read, or that we simply didn't read the whole message on - // the first read. So, we try to read again. - // - // NOTE! this changes this->uncompleted_message_! - this->try_to_complete (max_wait_time); + return this->consolidate_fragments (nqd, rh); + } - // ==> if (bytes missing from uncompleted_message_ == 0) - if (this->uncompleted_message_->missing_data_bytes_ == 0) - { - /* - * We completed the message! Hooray! - */ - // ==> place uncompleted_message_ (which is now - // complete!) at the tail of the incoming message - // queue; - - // ---> NOTE: whoever pulls this off the queue must delete it! - this->uncompleted_message_->current_state_ - = TAO_Queued_Data::COMPLETED; - - // @@CJC NEED TO CHECK RETURN VALUE HERE! - this->enqueue_incoming_message (this->uncompleted_message_); - did_queue_message = 1; - // zero out uncompleted_message_; - this->uncompleted_message_ = 0; - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "completed and queued message for processing!\n", - this->id ())); - } + // Process the message + return this->process_parsed_messages (&qd, + rh); +} - } - else - { +int +TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + // Parse the incoming message for validity. The check needs to be + // performed by the messaging objects. + if (this->parse_incoming_messages (block) == -1) + { + return -1; + } - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "still need %u bytes to complete uncompleted message.\n", - this->id (), - this->uncompleted_message_->missing_data_bytes_)); - } - } - break; + // Check whether we have a complete message for processing + ssize_t missing_data = this->missing_data (block); - default: - // @@CJC What do we do here?! - ACE_ASSERT (! "Transport::handle_input_i: unexpected state" - "in uncompleted_message_"); - } - } - // Does the order of the checks matter? In both (a) and (b), - // message_block is empty, but only in (b) is there no - // uncompleted_message_. - // ==> until (message_block is empty || there is no uncompleted_message_); - // or, rewritten in C++ looping constructs - // ==> while ( ! message_block is empty && there is an uncompleted_message_ ); - while (message_block.length() != 0 && this->uncompleted_message_); - } - - // ***************************** - // @@ CJC - // - // Once upon a time we tried to complete reading the uncompleted - // message here, but testing found that completing later worked - // better. - // ***************************** - - - // At this point, there should be nothing in uncompleted_message_. - // We now need to chop up the bytes in message_block and store any - // complete messages in the incoming message queue. - // - // ==> if (message_block still has data) - if (message_block.length () != 0) - { - TAO_Queued_Data *complete_message = 0; - do - { - if (TAO_debug_level >= 10) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ") - ACE_TEXT("extracting complete messages\n"))); - ACE_HEX_DUMP ((LM_DEBUG, - message_block.rd_ptr (), - message_block.length (), - ACE_TEXT (" from this message buffer"))); - } - complete_message = - TAO_Queued_Data::make_completed_message ( - message_block, *this->messaging_object ()); - if (complete_message) - { - this->enqueue_incoming_message (complete_message); - did_queue_message = 1; - } - } - while (complete_message != 0); - // On exit from this frame we have one of the following states: - // (a) message_block is empty - // (b) message_block contains bytes from a partial message - } - - // If, at this point, there's still data in message_block, it's - // an incomplete message. Therefore, we stuff it into the - // uncompleted_message_ and clear out message_block. - // ==> if (message_block still has data) - if (message_block.length () != 0) - { - // duplicate message_block remainder into this->uncompleted_message_ - ACE_ASSERT (this->uncompleted_message_ == 0); - this->uncompleted_message_ = - TAO_Queued_Data::make_uncompleted_message (&message_block, - *this->messaging_object ()); - ACE_ASSERT (this->uncompleted_message_ != 0); - - // In a debug build, we won't reach this point if we couldn't - // create an uncompleted message because the above ASSERT will - // trip. However, in an optimized build, the ASSERT isn't - // there, so we'll go past here. - // - // We could put a check in here similar to the ASSERT condition, - // but doing that would terminate this loop early and result in - // our never processing any completed messages that were received - // in this trip to handle_input_i. - // - // Maybe we could instead queue up a special completed message that, - // when processed, causes the connection to get closed in a non-graceful - // termination scenario. - } - - // We should have consumed ALL the bytes by now. - ACE_ASSERT (message_block.length () == 0); - - // - // We don't want to try to re-read earlier because we may not have - // an uncompleted message until we get to this point. So, if we did - // it earlier, we could have missed the opportunity to complete it - // and dispatch. - // - // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again - // to increase throughput! - - if (this->uncompleted_message_) - { - if (number_of_read_attempts--) - { - // We try to read again just in case more data arrived while - // we were doing the stuff above. This way, we can increase - // throughput without much of a penalty. + if (missing_data < 0) + { + // If we have more than one message + return this->consolidate_extra_messages (block, + rh); + } + else if (missing_data > 0) + { + // If we have missing data then try doing a read or try queueing + // them. + return this->consolidate_message (block, + missing_data, + rh, + max_wait_time); + } - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "still have an uncompleted message; " - "will try %d more times before letting " - "somebody else have a chance.\n", - this->id (), - number_of_read_attempts)); - } + return 1; +} - // We only bother trying to complete payload, not header, because the - // retry only happens in the complete-the-payload clause above. - if (this->uncompleted_message_->current_state_ == - TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD) - goto complete_message_and_possibly_enqueue; - } - else +int +TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) +{ + // If we have a queue and if the last message is not complete a + // complete one, then this read will get us the remaining data. So + // do not try to parse the header if we have an incomplete message + // in the queue. + if (this->incoming_message_queue_.is_tail_complete () != 0) + { + // As it looks like a new message has been read, process the + // message. Call the messaging object to do the parsing.. + int retval = + this->messaging_object ()->parse_incoming_messages (block); + + if (retval == -1) { - // The queue should be empty because it should have been processed - // above. But I wonder if I should put a check in here anyway. if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "giving up reading for now and returning " - "with incoming queue length = %d\n", - this->id (), - this->incoming_message_queue_.queue_length ())); - if (this->uncompleted_message_) - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "missing bytes from uncompleted message = %u\n", - this->id (), - this->uncompleted_message_->missing_data_bytes_)); - } - return 1; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, " + "error in incoming message\n", + this->id ())); + + return -1; } } - // **** END CJC PMG CHANGES **** + return 0; +} + + +size_t +TAO_Transport::missing_data (ACE_Message_Block &incoming) +{ + // If we have a incomplete message in the queue then find out how + // much of data is required to get a complete message. + if (this->incoming_message_queue_.is_tail_complete () == 0) + { + return this->incoming_message_queue_.missing_data_tail (); + } - return did_queue_message ? this->process_queue_head (rh) : 1; + return this->messaging_object ()->missing_data (incoming); } -void -TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time) +int +TAO_Transport::consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) { - if (this->uncompleted_message_ == 0) - return; + // Check whether the last message in the queue is complete.. + if (this->incoming_message_queue_.is_tail_complete () == 0) + { + return this->consolidate_message_queue (incoming, + missing_data, + rh, + max_wait_time); + } + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message\n", + this->id ())); + } + + // Calculate the actual length of the load that we are supposed to + // read which is equal to the <missing_data> + length of the buffer + // that we have.. + size_t payload = missing_data + incoming.size (); + + // Grow the buffer to the size of the message + ACE_CDR::grow (&incoming, + payload); ssize_t n = 0; - size_t &missing_data = this->uncompleted_message_->missing_data_bytes_; - ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_; - // Try to complete this until we error or block right here... - for (ssize_t bytes = missing_data; - bytes != 0; - bytes -= n) + // As this used for transports where things are available in one + // shot this looping should not create any problems. + for (ssize_t bytes = missing_data; bytes != 0; bytes -= n) { // .. do a read on the socket again. - n = this->recv (mb.wr_ptr (), + n = this->recv (incoming.wr_ptr (), bytes, max_wait_time); if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "TAO (%P|%t) - Transport[%d]::consolidate_message, " "read %d bytes on attempt\n", this->id(), n)); } @@ -1645,168 +1395,375 @@ TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time) break; } - mb.wr_ptr (n); + incoming.wr_ptr (n); missing_data -= n; } + + // If we got an error.. + if (n == -1) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Trasport[%d]::consolidate_message, " + "error while trying to consolidate\n", + this->id ())); + } + + return -1; + } + + // If we had gotten a EWOULDBLOCK n would be equal to zero. But we + // have to put the message in the queue anyway. So let us proceed + // to do that and return... + + // Check to see if we have messages in queue or if we have missing + // data . AT this point we cannot have have semi-complete messages + // in the queue as they would have been taken care before. Put + // ourselves in the queue and then try processing one of the + // messages.. + if ((missing_data > 0 + ||this->incoming_message_queue_.queue_length ()) + && this->incoming_message_queue_.is_tail_fragmented () == 0) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "queueing up the message\n", + this->id ())); + } + + // Get a queued data + TAO_Queued_Data *qd = + this->make_queued_data (incoming); + + // Add the missing data to the queue + qd->missing_data_ = missing_data; + + // Get the rest of the messaging data + this->messaging_object ()->get_message_data (qd); + + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); + + if (this->incoming_message_queue_.is_head_complete ()) + { + return this->process_queue_head (rh); + } + + return 0; + } + + // We dont have any missing data. Just make a queued_data node with + // the existing message block and send it to the higher layers of + // the ORB. + TAO_Queued_Data pqd (&incoming, + this->orb_core_->transport_message_buffer_allocator ()); + pqd.missing_data_ = missing_data; + this->messaging_object ()->get_message_data (&pqd); + + // Check whether the message was fragmented and try to consolidate + // the fragments.. + if (pqd.more_fragments_ || + (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + // Duplicate the queued data as it is on stack.. + TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd); + + return this->consolidate_fragments (nqd, rh); + } + + // Now we have a full message in our buffer. Just go ahead and + // process that + return this->process_parsed_messages (&pqd, + rh); } +int +TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh) +{ + // If we have received a fragment message then we have to + // consolidate <qd> with the last message in queue + // @@todo: this piece of logic follows GIOP a bit... Need to revisit + // if we have protocols other than GIOP + + // @@todo: Fragments now have too much copying overhead. Need to get + // them out if we want to have some reasonable performance metrics + // in future.. Post 1.2 seems a nice time.. + if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + TAO_Queued_Data *tqd = + this->incoming_message_queue_.dequeue_tail (); + + tqd->more_fragments_ = qd->more_fragments_; + tqd->missing_data_ = qd->missing_data_; + + if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1) + { + return -1; + } + + TAO_Queued_Data::release (qd); + this->incoming_message_queue_.enqueue_tail (tqd); + this->process_queue_head (rh); + } + else + { + // if we dont have a fragment already in the queue just add it in + // the queue + this->incoming_message_queue_.enqueue_tail (qd); + } + + return 0; +} int -TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message) +TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) { - // Get the GIOP version - CORBA::Octet major = queueable_message->major_version_; - CORBA::Octet minor = queueable_message->minor_version_; - CORBA::UShort whole = major << 8 | minor; - - // Set up a couple of pointers that are shared by the code - // for the different GIOP versions. - ACE_Message_Block *mb = 0; - TAO_Queued_Data *fragment_message = 0; - - switch(whole) - { - case 0x0100: // GIOP 1.0 - if (!queueable_message->more_fragments_) - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); - - // Fragments aren't supported in 1.0. This is an error and - // we should reject it somehow. What do we do here? Do we throw - // an exception to the receiving side? Do we throw an exception - // to the sending side? - // - // At the very least, we need to log the fact that we received - // nonsense. - ACE_ERROR_RETURN ((LM_ERROR, - "TAO (%P|%t) - " - "TAO_Transport::enqueue_incoming_message " - "detected a fragmented GIOP 1.0 message\n"), - -1); - break; - case 0x0101: // GIOP 1.1 - // In 1.1, fragments kinda suck because they don't have they're - // own message-specific header. Therefore, we have to do the - // following: - fragment_message = - this->incoming_message_queue_.find_fragment (major, minor); - - // No fragment was found - if (fragment_message == 0) - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); - - if (queueable_message->more_fragments_) + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n", + this->id ())); + } + + // If the queue did not have a complete message put this piece of + // message in the queue. We know it did not have a complete + // message. That is why we are here. + size_t n = + this->incoming_message_queue_.copy_tail (incoming); + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "copied [%d] bytes to the tail\n", + this->id (), + n)); + } + + // Update the missing data... + missing_data = + this->incoming_message_queue_.missing_data_tail (); + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "missing [%d] bytes in the tail message\n", + this->id (), + missing_data)); + } + + // Move the read pointer of the <incoming> message block to the end + // of the copied message and process the remaining portion... + incoming.rd_ptr (n); + + // If we have some more information left in the message block.. + if (incoming.length ()) + { + // We may have to parse & consolidate. This part of the message + // doesn't seem to be part of the last message in the queue (as + // the copy () hasn't taken away this message). + int retval = this->parse_consolidate_messages (incoming, + rh, + max_wait_time); + + // If there is an error return + if (retval == -1) { - // Find the last message block in the continuation - mb = fragment_message->msg_block_; - while (mb->cont () != 0) - mb = mb->cont (); - - // Add the current message block to the end of the chain - // after adjusting the read pointer to skip the GIOP header - queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN); - mb->cont (queueable_message->msg_block_); - - // Get rid of the queuable message but save the message block - queueable_message->msg_block_ = 0; - queueable_message->release (); - - // One note is that TAO_Queued_Data contains version numbers, - // but doesn't indicate the actual protocol to which those - // version numbers refer. That's not a problem, though, because - // instances of TAO_Queued_Data live in a queue, and that queue - // lives in a particular instance of a Transport, and the - // transport instance has an association with a particular - // messaging_object. The concrete messaging object embodies a - // messaging protocol, and must cover all versions of that - // protocol. Therefore, we just need to cover the bases of all - // versions of that one protocol. + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "error while consolidating, part of the read message\n", + this->id ())); + } + return retval; } - else + else if (retval == 1) { - // There is a complete chain of fragments - fragment_message->consolidate (); + // If the message in the <incoming> message block has only + // one message left we need to process that seperately. + + // Get a queued data + TAO_Queued_Data *qd = this->make_queued_data (incoming); + + // Get the rest of the message data + this->messaging_object ()->get_message_data (qd); + + // Add the missing data to the queue + qd->missing_data_ = 0; - // Go ahead and enqueue this message - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); + // Check whether the message was fragmented and try to consolidate + // the fragments.. + if (qd->more_fragments_ || + (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + return this->consolidate_fragments (qd, rh); + } + + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); + + // We should surely have a message in queue now. So just + // process that. + return this->process_queue_head (rh); } - break; - case 0x0102: // GIOP 1.2 - // In 1.2, we get a little more context. There's a - // FRAGMENT message-specific header, and inside that is the - // request id with which the fragment is associated. - fragment_message = - this->incoming_message_queue_.find_fragment ( - queueable_message->request_id_); - - // No fragment was found - if (fragment_message == 0) - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); - - if (fragment_message->major_version_ != major || - fragment_message->minor_version_ != minor) - ACE_ERROR_RETURN ((LM_ERROR, - "TAO (%P|%t) - " - "TAO_Transport::enqueue_incoming_message " - "GIOP versions do not match " - "(%d.%d != %d.%d\n", - fragment_message->major_version_, - fragment_message->minor_version_, - major, minor), - -1); - - // Find the last message block in the continuation - mb = fragment_message->msg_block_; - while (mb->cont () != 0) - mb = mb->cont (); - - // Add the current message block to the end of the chain - // after adjusting the read pointer to skip the GIOP header - queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN + - TAO_GIOP_MESSAGE_FRAGMENT_HEADER); - mb->cont (queueable_message->msg_block_); - - // Remove our reference to the message block. At this point - // the message block of the fragment head owns it as part of a - // chain - queueable_message->msg_block_ = 0; - - if (!queueable_message->more_fragments_) + + // parse_consolidate_messages () would have processed one of the + // messages, so we better return as we dont want to starve other + // threads. + return 0; + } + + // If we still have some missing data.. + if (missing_data > 0) + { + // Get the last message from the Queue + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_tail (); + + if (TAO_debug_level > 5) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "trying recv, again\n", + this->id ())); + } + + // Try to do a read again. If we have some luck it would be + // great.. + ssize_t n = this->recv (qd->msg_block_->wr_ptr (), + missing_data, + max_wait_time); + + if (TAO_debug_level > 5) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "recv retval [%d]\n", + this->id (), + n)); + } + + // Error... + if (n < 0) + { + return n; + } + + // If we get a EWOULDBLOCK ie. n==0, we should anyway put the + // message in queue before returning.. + // Move the write pointer + qd->msg_block_->wr_ptr (n); + + // Decrement the missing data + qd->missing_data_ -= n; + + // Now put the TAO_Queued_Data back in the queue + this->incoming_message_queue_.enqueue_tail (qd); + + // Any way as we have come this far and are about to return, + // just try to process a message if it is there in the queue. + if (this->incoming_message_queue_.is_head_complete ()) { - // This is the end of the fragments for this request - fragment_message->consolidate (); + return this->process_queue_head (rh); } - // Get rid of the queuable message - queueable_message->release (); - break; - default: - if (!queueable_message->more_fragments_) - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); - // This is an unknown GIOP version - ACE_ERROR_RETURN ((LM_ERROR, - "TAO (%P|%t) - " - "TAO_Transport::enqueue_incoming_message " - "can not handle a fragmented GIOP %d.%d " - "message\n", major, minor), - -1); + return 0; } - return 0; + // Process a message in the head of the queue if we have one.. + return this->process_queue_head (rh); } int +TAO_Transport::consolidate_extra_messages (ACE_Message_Block + &incoming, + TAO_Resume_Handle &rh) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n", + this->id ())); + } + + // Pick the tail of the queue + TAO_Queued_Data *tail = + this->incoming_message_queue_.dequeue_tail (); + + if (tail) + { + // If we have a node in the tail, checek to see whether it needs + // consolidation. If so, just consolidate it. + if (this->messaging_object ()->consolidate_node (tail, incoming) == -1) + { + return -1; + } + + // .. put the tail back in queue.. + this->incoming_message_queue_.enqueue_tail (tail); + } + + int retval = 1; + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, " + "extracting extra messages\n", + this->id ())); + } + + // Extract messages.. + while (retval == 1) + { + TAO_Queued_Data *q_data = 0; + + retval = + this->messaging_object ()->extract_next_message (incoming, + q_data); + if (q_data) + { + // If we have read a framented message then... + if (q_data->more_fragments_ || + q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + this->consolidate_fragments (q_data, rh); + } + else + { + this->incoming_message_queue_.enqueue_tail (q_data); + } + } + } + + // In case of error return.. + if (retval == -1) + { + return retval; + } + + return this->process_queue_head (rh); +} + +int TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) { // Get the <message_type> that we have received TAO_Pluggable_Message_Type t = qd->msg_type_; + // int result = 0; + if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) { if (TAO_debug_level > 0) @@ -1871,69 +1828,126 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, return 0; } -int -TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) +TAO_Queued_Data * +TAO_Transport::make_queued_data (ACE_Message_Block &incoming) { - if (TAO_debug_level > 3) + // Get an instance of TAO_Queued_Data + TAO_Queued_Data *qd = + TAO_Queued_Data::get_queued_data ( + this->orb_core_->transport_message_buffer_allocator ()); + + // Get the flag for the details of the data block... + ACE_Message_Block::Message_Flags flg = + incoming.self_flags (); + + if (ACE_BIT_DISABLED (flg, + ACE_Message_Block::DONT_DELETE)) { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::process_queue_head\n", - this->id ())); + // Duplicate the data block before putting it in the queue. + qd->msg_block_ = ACE_Message_Block::duplicate (&incoming); } + else + { + // As we are in CORBA mode, all the data blocks would be aligned + // on an 8 byte boundary. Hence create a data block for more + // than the actual length + ACE_Data_Block *db = + this->orb_core_->create_input_cdr_data_block (incoming.length ()+ + ACE_CDR::MAX_ALIGNMENT); - if (this->incoming_message_queue_.is_head_complete () != 1) - return 1; + // Get the allocator.. + ACE_Allocator *alloc = + this->orb_core_->input_cdr_msgblock_allocator (); - // Get the message on the head of the queue.. - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_head (); + // Make message block.. + ACE_Message_Block mb (db, + 0, + alloc); + + // Duplicate the block.. + qd->msg_block_ = mb.duplicate (); + + // Align the message block + ACE_CDR::mb_align (qd->msg_block_); + + // Copy the data.. + qd->msg_block_->copy (incoming.rd_ptr (), + incoming.length ()); + } + + + return qd; +} +int +TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) +{ if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "the size of the queue is [%d]\n", - this->id (), - this->incoming_message_queue_.queue_length())); + "TAO (%P|%t) - Transport[%d]::process_queue_head\n", + this->id ())); } - // Now that we have pulled out out one message out of the queue, - // check whether we have one more message in the queue... - if (this->incoming_message_queue_.queue_length () > 0) + + // See if the message in the head of the queue is complete... + if (this->incoming_message_queue_.is_head_complete () > 0) { - if (TAO_debug_level > 0) + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); + + if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "notify reactor\n", - this->id ())); + "the size of the queue is [%d]\n", + this->id (), + this->incoming_message_queue_.queue_length())); + } + // Now that we have pulled out out one message out of the queue, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.is_head_complete () > 0) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::process_queue_head, " + "notify reactor\n", + this->id ())); + } + int retval = + this->notify_reactor (); + + if (retval == 1) + { + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); + } + else if (retval < 0) + return -1; + } + else + { + // As we are ready to process the last message just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); } - int retval = - this->notify_reactor (); - if (retval == 1) + // Process the message... + if (this->process_parsed_messages (qd, rh) == -1) { - // Let the class know that it doesn't need to resume the - // handle.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); + return -1; } - else if (retval < 0) - return -1; - } - else - { - // As we are ready to process the last message just resume - // the handle. Set the flag incase someone had reset the flag.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); - } - // Process the message... - int retval = this->process_parsed_messages (qd, rh); + // Delete the Queued_Data.. + TAO_Queued_Data::release (qd); - // Delete the Queued_Data.. - TAO_Queued_Data::release (qd); + return 0; + } - return (retval == -1) ? -1 : 0; + return 1; } int @@ -1945,8 +1959,6 @@ TAO_Transport::notify_reactor (void) } ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; // Get the reactor associated with the event handler ACE_Reactor *reactor = this->orb_core ()->reactor (); @@ -1983,18 +1995,6 @@ TAO_Transport::transport_cache_manager (void) return this->orb_core_->lane_resources ().transport_cache (); } -size_t -TAO_Transport::recv_buffer_size (void) -{ - return this->recv_buffer_size_; -} - -size_t -TAO_Transport::sent_byte_count (void) -{ - return this->sent_byte_count_; -} - void TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp) { |