diff options
author | Chris Cleeland <chris.cleeland@gmail.com> | 2003-12-15 22:31:47 +0000 |
---|---|---|
committer | Chris Cleeland <chris.cleeland@gmail.com> | 2003-12-15 22:31:47 +0000 |
commit | a2b02673b2809eac5f38e558c24f68613fddc64b (patch) | |
tree | 1552aab7b624fa48016ab63fc5d50a087ce222a2 | |
parent | 2734c6742965bf533afca9c3ecfa3b34d08b0f5a (diff) | |
download | ATCD-unlabeled-1.94.2.tar.gz |
Tag: pmb_integrationunlabeled-1.94.2
Started work on performance enhancements for PMB.
-rw-r--r-- | TAO/tao/Transport.cpp | 1208 |
1 files changed, 604 insertions, 604 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 9923ff1f895..1640644edf9 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -19,7 +19,9 @@ #include "Resume_Handle.h" #include "Codeset_Manager.h" #include "Codeset_Translator_Factory.h" +#include "GIOP_Message_State.h" #include "ace/OS_NS_sys_time.h" +#include "ace/Message_Block.h" #include "ace/Reactor.h" @@ -110,12 +112,15 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , head_ (0) , tail_ (0) , incoming_message_queue_ (orb_core) + , uncompleted_message_ (0) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) , id_ ((size_t) this) , purging_order_ (0) + , recv_buffer_size_ (0) + , sent_byte_count_ (0) , char_translator_ (0) , wchar_translator_ (0) , tcs_set_ (0) @@ -236,13 +241,9 @@ TAO_Transport::generate_request_header ( { // codeset service context is only supposed to be sent in the first request // on a particular connection. - if (this->first_request_) - { - this->orb_core ()->codeset_manager ()->generate_service_context ( - opdetails, - *this - ); - } + TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager(); + if (csm && this->first_request_) + csm->generate_service_context( opdetails, *this ); if (this->messaging_object ()->generate_request_header (opdetails, spec, @@ -604,7 +605,12 @@ int TAO_Transport::schedule_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; if (TAO_debug_level > 3) { @@ -620,7 +626,12 @@ int TAO_Transport::cancel_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; if (TAO_debug_level > 3) { @@ -739,6 +750,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) // no bytes are sent send() can only return 0 or -1 ACE_ASSERT (byte_count != 0); + // Total no. of bytes sent for a send call + this->sent_byte_count_ += byte_count; + if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, @@ -762,6 +776,10 @@ TAO_Transport::drain_queue_i (void) // We loop over all the elements in the queue ... TAO_Queued_Message *i = this->head_; + // reset the value so that the counting is done for each new send + // call. + this->sent_byte_count_ = 0; + while (i != 0) { // ... each element fills the iovector ... @@ -820,8 +838,14 @@ TAO_Transport::drain_queue_i (void) if (this->flush_timer_pending ()) { ACE_Event_Handler *eh = this->event_handler_i (); - ACE_Reactor *reactor = eh->reactor (); - reactor->cancel_timer (this->flush_timer_id_); + if (eh != 0) + { + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + } this->reset_flush_timer (); } @@ -898,20 +922,25 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, if (set_timer) { ACE_Event_Handler *eh = this->event_handler_i (); - ACE_Reactor *reactor = eh->reactor (); - this->current_deadline_ = new_deadline; - ACE_Time_Value delay = - new_deadline - ACE_OS::gettimeofday (); - - if (this->flush_timer_pending ()) + if (eh != 0) { - reactor->cancel_timer (this->flush_timer_id_); - } + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); - this->flush_timer_id_ = - reactor->schedule_timer (&this->transport_timer_, - &this->current_deadline_, - delay); + if (this->flush_timer_pending ()) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); + } + } } return constraints_reached; @@ -1118,6 +1147,18 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub, return 0; } + +class CTHack +{ +public: + CTHack() { enter(); } + ~CTHack() { leave(); } +private: + void enter() { x = 1; } + void leave() { x = 0; } + int x; +}; + /* * * All the methods relevant to the incoming data path of the ORB are @@ -1129,6 +1170,8 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_Time_Value * max_wait_time, int /*block*/) { + CTHack cthack; + if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, @@ -1136,9 +1179,8 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, this->id ())); } - // First try to process messages of the head of the incoming queue. + // First try to process messages off the head of the incoming queue. int retval = this->process_queue_head (rh); - if (retval <= 0) { if (retval == -1) @@ -1149,7 +1191,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, "error while parsing the head of the queue\n", this->id())); } - return retval; } @@ -1158,7 +1199,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // The buffer on the stack which will be used to hold the input // messages - char buf [TAO_MAXBUFSIZE]; + char buf[TAO_MAXBUFSIZE]; #if defined (ACE_HAS_PURIFY) (void) ACE_OS::memset (buf, @@ -1180,26 +1221,35 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_Message_Block::DONT_DELETE, this->orb_core_->input_cdr_msgblock_allocator ()); + // We'll loop trying to complete the message this number of times, + // and that's it. + unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS; + + unsigned int did_queue_message = 0; // Align the message block ACE_CDR::mb_align (&message_block); size_t recv_size = 0; - if (this->orb_core_->orb_params ()->single_read_optimization ()) { - recv_size = - message_block.space (); + recv_size = message_block.space (); } else { - recv_size = - this->messaging_object ()->header_length (); + recv_size = this->messaging_object ()->header_length (); } + // Saving the size of the received buffer in case any one needs to + // get the size of the message thats received in the + // context. Obviously the value will be changed for each recv call + // and the user is supposed to invoke the accessor only in the + // invocation context to get meaningful information. + this->recv_buffer_size_ = recv_size; + // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.rd_ptr (), + ssize_t n = this->recv (message_block.wr_ptr (), recv_size, max_wait_time); @@ -1212,7 +1262,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "TAO (%P|%t) - Transport[%d]::handle_input_i: " "read %d bytes\n", this->id (), n)); } @@ -1220,172 +1270,372 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // Set the write pointer in the stack buffer message_block.wr_ptr (n); - // Parse the message and try consolidating the message if - // needed. - retval = this->parse_consolidate_messages (message_block, - rh, - max_wait_time); + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + (const char *) message_block.rd_ptr (), + message_block.length (), + ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket"))); - if (retval <= 0) - { - if (retval == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " - "error while parsing and consolidating\n", - this->id ())); - } - return retval; - } - - // Make a node of the message block.. - TAO_Queued_Data qd (&message_block, - this->orb_core_->transport_message_buffer_allocator ()); - - // Extract the data for the node.. - this->messaging_object ()->get_message_data (&qd); - // Check whether the message was fragmented.. - if (qd.more_fragments_ || - (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) +complete_message_and_possibly_enqueue: + // Check to see if we're still working to complete a message + if (this->uncompleted_message_) { - // Duplicate the node that we have as the node is on stack.. - TAO_Queued_Data *nqd = - TAO_Queued_Data::duplicate (qd); + // try to complete it - return this->consolidate_fragments (nqd, rh); - } + // on exit from this frame we have one of the following states: + // + // (a) an uncompleted message still in uncompleted_message_ + // AND message_block is empty + // + // (b) uncompleted_message_ zero, the completed message at the + // tail of the incoming queue; message_block could be empty + // or still contain bytes - // Process the message - return this->process_parsed_messages (&qd, - rh); -} + // ==> repeat + do + { + /* + * Append the "right number of bytes" to uncompleted_message_ + */ + // ==> right_number_of_bytes = MIN(bytes missing from + // uncompleted_message_, length of message_block); + size_t right_number_of_bytes = + ACE_MIN (this->uncompleted_message_->missing_data_bytes_, + message_block.length () ); -int -TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) -{ - // Parse the incoming message for validity. The check needs to be - // performed by the messaging objects. - if (this->parse_incoming_messages (block) == -1) - { - return -1; - } + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "trying to use %u (of %u) " + "bytes to complete message missing %u bytes\n", + this->id (), + right_number_of_bytes, + message_block.length (), + this->uncompleted_message_->missing_data_bytes_)); + } - // Check whether we have a complete message for processing - ssize_t missing_data = this->missing_data (block); + // ==> append right_number_of_bytes from message_block + // to uncomplete_message_ & update read pointer of + // message_block; + + // 1. we assume that uncompleted_message_.msg_block_'s + // wr_ptr is properly maintained + // 2. we presume that uncompleted_message_.msg_block was + // allocated with enough space to contain the *entire* + // expected GIOP message, so this copy shouldn't involve an + // additional allocation + this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (), + right_number_of_bytes); + this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; + message_block.rd_ptr (right_number_of_bytes); + + switch (this->uncompleted_message_->current_state_) + { + case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER: + { + int hdrvalidity = this->messaging_object()->check_for_valid_header ( + *this->uncompleted_message_->msg_block_); + if (hdrvalidity == 0) + { + // According to the spec, Section 15.4.8, we should send + // the MessageError GIOP message on receipt of "any message...whose + // header is not properly formed (e.g., has the wrong magic value)". + // + // So, rather than returning -1, what we REALLY need to do is + // send a MessageError in reply. + // + // I'm not sure what the best way to trigger that is...probably to + // queue up a special internal-only COMPLETED message that, when + // processed, sends the MessageError as part of its processing. + return -1; + } + else if (hdrvalidity == 1) + { + // ==> update bytes missing from uncompleted_message_ + // with size of message from valid header; + this->messaging_object()->set_queued_data_from_message_header ( + this->uncompleted_message_, + *this->uncompleted_message_->msg_block_); + // ==> change state of uncompleted_event_ to + // WAITING_TO_COMPLETE_PAYLOAD; + this->uncompleted_message_->current_state_ = + TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD; + + // ==> Resize the message block to have capacity for + // the rest of the incoming message + ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_; + ACE_CDR::grow (&mb, + mb.size () + + this->uncompleted_message_->missing_data_bytes_); + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "found a valid header in the message; " + "waiting for %u bytes to complete payload\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + + // Continue the loop... + continue; + } + // In the case where we don't have enough information (hdrvalidity == -1), + // we just have to fall through and collect more. +#if 0 + else + { + // What the heck will we do with a bad header? Just + // better to close the connection and let things + // re-train from there. + if (this->uncompleted_message_->msg_block_->length () == + this->messaging_object()->header_length()) + return -1; + +#if 0 // I don't think I need this clause, but I'm leaving it just in case. + // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes; + this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; + ACE_ASSERT (this->uncompleted_message_->missing_data_bytes_ > 0); +#endif + } +#endif + } + break; + + case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD: + // Here we have an opportunity to try to finish reading the + // uncompleted message. This is a Good Idea(TM) because there are + // good odds that either more data came available since the last + // time we read, or that we simply didn't read the whole message on + // the first read. So, we try to read again. + // + // NOTE! this changes this->uncompleted_message_! + this->try_to_complete (max_wait_time); + // ==> if (bytes missing from uncompleted_message_ == 0) + if (this->uncompleted_message_->missing_data_bytes_ == 0) + { + /* + * We completed the message! Hooray! + */ + // ==> place uncompleted_message_ (which is now + // complete!) at the tail of the incoming message + // queue; + + // ---> NOTE: whoever pulls this off the queue must delete it! + this->uncompleted_message_->current_state_ + = TAO_Queued_Data::COMPLETED; + + // @@CJC NEED TO CHECK RETURN VALUE HERE! + this->enqueue_incoming_message (this->uncompleted_message_); + did_queue_message = 1; + // zero out uncompleted_message_; + this->uncompleted_message_ = 0; + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "completed and queued message for processing!\n", + this->id ())); + } - if (missing_data < 0) - { - // If we have more than one message - return this->consolidate_extra_messages (block, - rh); - } - else if (missing_data > 0) - { - // If we have missing data then try doing a read or try queueing - // them. - return this->consolidate_message (block, - missing_data, - rh, - max_wait_time); - } + } + else + { - return 1; -} + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "still need %u bytes to complete uncompleted message.\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + } + break; -int -TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) -{ - // If we have a queue and if the last message is not complete a - // complete one, then this read will get us the remaining data. So - // do not try to parse the header if we have an incomplete message - // in the queue. - if (this->incoming_message_queue_.is_tail_complete () != 0) - { - // As it looks like a new message has been read, process the - // message. Call the messaging object to do the parsing.. - int retval = - this->messaging_object ()->parse_incoming_messages (block); + default: + // @@CJC What do we do here?! + ACE_ASSERT (! "Transport::handle_input_i: unexpected state" + "in uncompleted_message_"); + } + } + // Does the order of the checks matter? In both (a) and (b), + // message_block is empty, but only in (b) is there no + // uncompleted_message_. + // ==> until (message_block is empty || there is no uncompleted_message_); + // or, rewritten in C++ looping constructs + // ==> while ( ! message_block is empty && there is an uncompleted_message_ ); + while (message_block.length() != 0 && this->uncompleted_message_); + } + + // ***************************** + // @@ CJC + // + // Once upon a time we tried to complete reading the uncompleted + // message here, but testing found that completing later worked + // better. + // ***************************** + + + // At this point, there should be nothing in uncompleted_message_. + // We now need to chop up the bytes in message_block and store any + // complete messages in the incoming message queue. + // + // ==> if (message_block still has data) + if (message_block.length () != 0) + { + TAO_Queued_Data *complete_message = 0; + do + { + if (TAO_debug_level >= 10) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ") + ACE_TEXT("extracting complete messages\n"))); + ACE_HEX_DUMP ((LM_DEBUG, + message_block.rd_ptr (), + message_block.length (), + ACE_TEXT (" from this message buffer"))); + } - if (retval == -1) + complete_message = + TAO_Queued_Data::make_completed_message ( + message_block, *this->messaging_object ()); + if (complete_message) + { + this->enqueue_incoming_message (complete_message); + did_queue_message = 1; + } + } + while (complete_message != 0); + // On exit from this frame we have one of the following states: + // (a) message_block is empty + // (b) message_block contains bytes from a partial message + } + + // If, at this point, there's still data in message_block, it's + // an incomplete message. Therefore, we stuff it into the + // uncompleted_message_ and clear out message_block. + // ==> if (message_block still has data) + if (message_block.length () != 0) + { + // duplicate message_block remainder into this->uncompleted_message_ + ACE_ASSERT (this->uncompleted_message_ == 0); + this->uncompleted_message_ = + TAO_Queued_Data::make_uncompleted_message (&message_block, + *this->messaging_object ()); + ACE_ASSERT (this->uncompleted_message_ != 0); + + // In a debug build, we won't reach this point if we couldn't + // create an uncompleted message because the above ASSERT will + // trip. However, in an optimized build, the ASSERT isn't + // there, so we'll go past here. + // + // We could put a check in here similar to the ASSERT condition, + // but doing that would terminate this loop early and result in + // our never processing any completed messages that were received + // in this trip to handle_input_i. + // + // Maybe we could instead queue up a special completed message that, + // when processed, causes the connection to get closed in a non-graceful + // termination scenario. + } + + // We should have consumed ALL the bytes by now. + ACE_ASSERT (message_block.length () == 0); + + // + // We don't want to try to re-read earlier because we may not have + // an uncompleted message until we get to this point. So, if we did + // it earlier, we could have missed the opportunity to complete it + // and dispatch. + // + // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again + // to increase throughput! + + if (this->uncompleted_message_) + { + if (number_of_read_attempts--) { + // We try to read again just in case more data arrived while + // we were doing the stuff above. This way, we can increase + // throughput without much of a penalty. + if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, " - "error in incoming message\n", - this->id ())); + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "still have an uncompleted message; " + "will try %d more times before letting " + "somebody else have a chance.\n", + this->id (), + number_of_read_attempts)); + } - return -1; + // We only bother trying to complete payload, not header, because the + // retry only happens in the complete-the-payload clause above. + if (this->uncompleted_message_->current_state_ == + TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD) + goto complete_message_and_possibly_enqueue; + } + else + { + // The queue should be empty because it should have been processed + // above. But I wonder if I should put a check in here anyway. + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "giving up reading for now and returning " + "with incoming queue length = %d\n", + this->id (), + this->incoming_message_queue_.queue_length ())); + if (this->uncompleted_message_) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "missing bytes from uncompleted message = %u\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + return 1; } } - return 0; -} - - -size_t -TAO_Transport::missing_data (ACE_Message_Block &incoming) -{ - // If we have a incomplete message in the queue then find out how - // much of data is required to get a complete message. - if (this->incoming_message_queue_.is_tail_complete () == 0) - { - return this->incoming_message_queue_.missing_data_tail (); - } + // **** END CJC PMG CHANGES **** - return this->messaging_object ()->missing_data (incoming); + return did_queue_message ? this->process_queue_head (rh) : 1; } -int -TAO_Transport::consolidate_message (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) +void +TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time) { - // Check whether the last message in the queue is complete.. - if (this->incoming_message_queue_.is_tail_complete () == 0) - { - return this->consolidate_message_queue (incoming, - missing_data, - rh, - max_wait_time); - } - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message\n", - this->id ())); - } - - // Calculate the actual length of the load that we are supposed to - // read which is equal to the <missing_data> + length of the buffer - // that we have.. - size_t payload = missing_data + incoming.size (); - - // Grow the buffer to the size of the message - ACE_CDR::grow (&incoming, - payload); + if (this->uncompleted_message_ == 0) + return; ssize_t n = 0; + size_t &missing_data = this->uncompleted_message_->missing_data_bytes_; + ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_; - // As this used for transports where things are available in one - // shot this looping should not create any problems. - for (ssize_t bytes = missing_data; bytes != 0; bytes -= n) + // Try to complete this until we error or block right here... + for (ssize_t bytes = missing_data; + bytes != 0; + bytes -= n) { // .. do a read on the socket again. - n = this->recv (incoming.wr_ptr (), + n = this->recv (mb.wr_ptr (), bytes, max_wait_time); if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "TAO (%P|%t) - Transport[%d]::handle_input_i, " "read %d bytes on attempt\n", this->id(), n)); } @@ -1395,375 +1645,168 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, break; } - incoming.wr_ptr (n); + mb.wr_ptr (n); missing_data -= n; } - - // If we got an error.. - if (n == -1) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Trasport[%d]::consolidate_message, " - "error while trying to consolidate\n", - this->id ())); - } - - return -1; - } - - // If we had gotten a EWOULDBLOCK n would be equal to zero. But we - // have to put the message in the queue anyway. So let us proceed - // to do that and return... - - // Check to see if we have messages in queue or if we have missing - // data . AT this point we cannot have have semi-complete messages - // in the queue as they would have been taken care before. Put - // ourselves in the queue and then try processing one of the - // messages.. - if ((missing_data > 0 - ||this->incoming_message_queue_.queue_length ()) - && this->incoming_message_queue_.is_tail_fragmented () == 0) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message, " - "queueing up the message\n", - this->id ())); - } - - // Get a queued data - TAO_Queued_Data *qd = - this->make_queued_data (incoming); - - // Add the missing data to the queue - qd->missing_data_ = missing_data; - - // Get the rest of the messaging data - this->messaging_object ()->get_message_data (qd); - - // Add it to the tail of the queue.. - this->incoming_message_queue_.enqueue_tail (qd); - - if (this->incoming_message_queue_.is_head_complete ()) - { - return this->process_queue_head (rh); - } - - return 0; - } - - // We dont have any missing data. Just make a queued_data node with - // the existing message block and send it to the higher layers of - // the ORB. - TAO_Queued_Data pqd (&incoming, - this->orb_core_->transport_message_buffer_allocator ()); - pqd.missing_data_ = missing_data; - this->messaging_object ()->get_message_data (&pqd); - - // Check whether the message was fragmented and try to consolidate - // the fragments.. - if (pqd.more_fragments_ || - (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - // Duplicate the queued data as it is on stack.. - TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd); - - return this->consolidate_fragments (nqd, rh); - } - - // Now we have a full message in our buffer. Just go ahead and - // process that - return this->process_parsed_messages (&pqd, - rh); } -int -TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd, - TAO_Resume_Handle &rh) -{ - // If we have received a fragment message then we have to - // consolidate <qd> with the last message in queue - // @@todo: this piece of logic follows GIOP a bit... Need to revisit - // if we have protocols other than GIOP - - // @@todo: Fragments now have too much copying overhead. Need to get - // them out if we want to have some reasonable performance metrics - // in future.. Post 1.2 seems a nice time.. - if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - TAO_Queued_Data *tqd = - this->incoming_message_queue_.dequeue_tail (); - - tqd->more_fragments_ = qd->more_fragments_; - tqd->missing_data_ = qd->missing_data_; - - if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1) - { - return -1; - } - - TAO_Queued_Data::release (qd); - this->incoming_message_queue_.enqueue_tail (tqd); - this->process_queue_head (rh); - } - else - { - // if we dont have a fragment already in the queue just add it in - // the queue - this->incoming_message_queue_.enqueue_tail (qd); - } - - return 0; -} int -TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) +TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message) { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n", - this->id ())); - } - - // If the queue did not have a complete message put this piece of - // message in the queue. We know it did not have a complete - // message. That is why we are here. - size_t n = - this->incoming_message_queue_.copy_tail (incoming); - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "copied [%d] bytes to the tail\n", - this->id (), - n)); - } - - // Update the missing data... - missing_data = - this->incoming_message_queue_.missing_data_tail (); - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "missing [%d] bytes in the tail message\n", - this->id (), - missing_data)); - } - - // Move the read pointer of the <incoming> message block to the end - // of the copied message and process the remaining portion... - incoming.rd_ptr (n); - - // If we have some more information left in the message block.. - if (incoming.length ()) - { - // We may have to parse & consolidate. This part of the message - // doesn't seem to be part of the last message in the queue (as - // the copy () hasn't taken away this message). - int retval = this->parse_consolidate_messages (incoming, - rh, - max_wait_time); - - // If there is an error return - if (retval == -1) + // Get the GIOP version + CORBA::Octet major = queueable_message->major_version_; + CORBA::Octet minor = queueable_message->minor_version_; + CORBA::UShort whole = major << 8 | minor; + + // Set up a couple of pointers that are shared by the code + // for the different GIOP versions. + ACE_Message_Block *mb = 0; + TAO_Queued_Data *fragment_message = 0; + + switch(whole) + { + case 0x0100: // GIOP 1.0 + if (!queueable_message->more_fragments_) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + + // Fragments aren't supported in 1.0. This is an error and + // we should reject it somehow. What do we do here? Do we throw + // an exception to the receiving side? Do we throw an exception + // to the sending side? + // + // At the very least, we need to log the fact that we received + // nonsense. + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - " + "TAO_Transport::enqueue_incoming_message " + "detected a fragmented GIOP 1.0 message\n"), + -1); + break; + case 0x0101: // GIOP 1.1 + // In 1.1, fragments kinda suck because they don't have they're + // own message-specific header. Therefore, we have to do the + // following: + fragment_message = + this->incoming_message_queue_.find_fragment (major, minor); + + // No fragment was found + if (fragment_message == 0) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + + if (queueable_message->more_fragments_) { - if (TAO_debug_level) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "error while consolidating, part of the read message\n", - this->id ())); - } - return retval; + // Find the last message block in the continuation + mb = fragment_message->msg_block_; + while (mb->cont () != 0) + mb = mb->cont (); + + // Add the current message block to the end of the chain + // after adjusting the read pointer to skip the GIOP header + queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN); + mb->cont (queueable_message->msg_block_); + + // Get rid of the queuable message but save the message block + queueable_message->msg_block_ = 0; + queueable_message->release (); + + // One note is that TAO_Queued_Data contains version numbers, + // but doesn't indicate the actual protocol to which those + // version numbers refer. That's not a problem, though, because + // instances of TAO_Queued_Data live in a queue, and that queue + // lives in a particular instance of a Transport, and the + // transport instance has an association with a particular + // messaging_object. The concrete messaging object embodies a + // messaging protocol, and must cover all versions of that + // protocol. Therefore, we just need to cover the bases of all + // versions of that one protocol. } - else if (retval == 1) - { - // If the message in the <incoming> message block has only - // one message left we need to process that seperately. - - // Get a queued data - TAO_Queued_Data *qd = this->make_queued_data (incoming); - - // Get the rest of the message data - this->messaging_object ()->get_message_data (qd); - - // Add the missing data to the queue - qd->missing_data_ = 0; - - // Check whether the message was fragmented and try to consolidate - // the fragments.. - if (qd->more_fragments_ || - (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - return this->consolidate_fragments (qd, rh); - } - - // Add it to the tail of the queue.. - this->incoming_message_queue_.enqueue_tail (qd); - - // We should surely have a message in queue now. So just - // process that. - return this->process_queue_head (rh); - } - - // parse_consolidate_messages () would have processed one of the - // messages, so we better return as we dont want to starve other - // threads. - return 0; - } - - // If we still have some missing data.. - if (missing_data > 0) - { - // Get the last message from the Queue - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_tail (); - - if (TAO_debug_level > 5) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "trying recv, again\n", - this->id ())); - } - - // Try to do a read again. If we have some luck it would be - // great.. - ssize_t n = this->recv (qd->msg_block_->wr_ptr (), - missing_data, - max_wait_time); - - if (TAO_debug_level > 5) + else { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "recv retval [%d]\n", - this->id (), - n)); - } + // There is a complete chain of fragments + fragment_message->consolidate (); - // Error... - if (n < 0) - { - return n; + // Go ahead and enqueue this message + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); } - - // If we get a EWOULDBLOCK ie. n==0, we should anyway put the - // message in queue before returning.. - // Move the write pointer - qd->msg_block_->wr_ptr (n); - - // Decrement the missing data - qd->missing_data_ -= n; - - // Now put the TAO_Queued_Data back in the queue - this->incoming_message_queue_.enqueue_tail (qd); - - // Any way as we have come this far and are about to return, - // just try to process a message if it is there in the queue. - if (this->incoming_message_queue_.is_head_complete ()) + break; + case 0x0102: // GIOP 1.2 + // In 1.2, we get a little more context. There's a + // FRAGMENT message-specific header, and inside that is the + // request id with which the fragment is associated. + fragment_message = + this->incoming_message_queue_.find_fragment ( + queueable_message->request_id_); + + // No fragment was found + if (fragment_message == 0) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + + if (fragment_message->major_version_ != major || + fragment_message->minor_version_ != minor) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - " + "TAO_Transport::enqueue_incoming_message " + "GIOP versions do not match " + "(%d.%d != %d.%d\n", + fragment_message->major_version_, + fragment_message->minor_version_, + major, minor), + -1); + + // Find the last message block in the continuation + mb = fragment_message->msg_block_; + while (mb->cont () != 0) + mb = mb->cont (); + + // Add the current message block to the end of the chain + // after adjusting the read pointer to skip the GIOP header + queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN + + TAO_GIOP_MESSAGE_FRAGMENT_HEADER); + mb->cont (queueable_message->msg_block_); + + // Remove our reference to the message block. At this point + // the message block of the fragment head owns it as part of a + // chain + queueable_message->msg_block_ = 0; + + if (!queueable_message->more_fragments_) { - return this->process_queue_head (rh); + // This is the end of the fragments for this request + fragment_message->consolidate (); } - return 0; + // Get rid of the queuable message + queueable_message->release (); + break; + default: + if (!queueable_message->more_fragments_) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + // This is an unknown GIOP version + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - " + "TAO_Transport::enqueue_incoming_message " + "can not handle a fragmented GIOP %d.%d " + "message\n", major, minor), + -1); } - // Process a message in the head of the queue if we have one.. - return this->process_queue_head (rh); + return 0; } int -TAO_Transport::consolidate_extra_messages (ACE_Message_Block - &incoming, - TAO_Resume_Handle &rh) -{ - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n", - this->id ())); - } - - // Pick the tail of the queue - TAO_Queued_Data *tail = - this->incoming_message_queue_.dequeue_tail (); - - if (tail) - { - // If we have a node in the tail, checek to see whether it needs - // consolidation. If so, just consolidate it. - if (this->messaging_object ()->consolidate_node (tail, incoming) == -1) - { - return -1; - } - - // .. put the tail back in queue.. - this->incoming_message_queue_.enqueue_tail (tail); - } - - int retval = 1; - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, " - "extracting extra messages\n", - this->id ())); - } - - // Extract messages.. - while (retval == 1) - { - TAO_Queued_Data *q_data = 0; - - retval = - this->messaging_object ()->extract_next_message (incoming, - q_data); - if (q_data) - { - // If we have read a framented message then... - if (q_data->more_fragments_ || - q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - this->consolidate_fragments (q_data, rh); - } - else - { - this->incoming_message_queue_.enqueue_tail (q_data); - } - } - } - - // In case of error return.. - if (retval == -1) - { - return retval; - } - - return this->process_queue_head (rh); -} - -int TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) { // Get the <message_type> that we have received TAO_Pluggable_Message_Type t = qd->msg_type_; - // int result = 0; - if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) { if (TAO_debug_level > 0) @@ -1828,57 +1871,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, return 0; } -TAO_Queued_Data * -TAO_Transport::make_queued_data (ACE_Message_Block &incoming) -{ - // Get an instance of TAO_Queued_Data - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data ( - this->orb_core_->transport_message_buffer_allocator ()); - - // Get the flag for the details of the data block... - ACE_Message_Block::Message_Flags flg = - incoming.self_flags (); - - if (ACE_BIT_DISABLED (flg, - ACE_Message_Block::DONT_DELETE)) - { - // Duplicate the data block before putting it in the queue. - qd->msg_block_ = ACE_Message_Block::duplicate (&incoming); - } - else - { - // As we are in CORBA mode, all the data blocks would be aligned - // on an 8 byte boundary. Hence create a data block for more - // than the actual length - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (incoming.length ()+ - ACE_CDR::MAX_ALIGNMENT); - - // Get the allocator.. - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - // Make message block.. - ACE_Message_Block mb (db, - 0, - alloc); - - // Duplicate the block.. - qd->msg_block_ = mb.duplicate (); - - // Align the message block - ACE_CDR::mb_align (qd->msg_block_); - - // Copy the data.. - qd->msg_block_->copy (incoming.rd_ptr (), - incoming.length ()); - } - - - return qd; -} - int TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) { @@ -1889,65 +1881,59 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) this->id ())); } - // See if the message in the head of the queue is complete... - if (this->incoming_message_queue_.is_head_complete () > 0) - { - // Get the message on the head of the queue.. - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_head (); + if (this->incoming_message_queue_.is_head_complete () != 1) + return 1; - if (TAO_debug_level > 3) + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::process_queue_head, " + "the size of the queue is [%d]\n", + this->id (), + this->incoming_message_queue_.queue_length())); + } + // Now that we have pulled out out one message out of the queue, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.queue_length () > 0) + { + if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "the size of the queue is [%d]\n", - this->id (), - this->incoming_message_queue_.queue_length())); - } - // Now that we have pulled out out one message out of the queue, - // check whether we have one more message in the queue... - if (this->incoming_message_queue_.is_head_complete () > 0) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "notify reactor\n", - this->id ())); - - } - int retval = - this->notify_reactor (); + "notify reactor\n", + this->id ())); - if (retval == 1) - { - // Let the class know that it doesn't need to resume the - // handle.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); - } - else if (retval < 0) - return -1; - } - else - { - // As we are ready to process the last message just resume - // the handle. Set the flag incase someone had reset the flag.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); } + int retval = + this->notify_reactor (); - // Process the message... - if (this->process_parsed_messages (qd, rh) == -1) + if (retval == 1) { - return -1; + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); } + else if (retval < 0) + return -1; + } + else + { + // As we are ready to process the last message just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + } - // Delete the Queued_Data.. - TAO_Queued_Data::release (qd); + // Process the message... + int retval = this->process_parsed_messages (qd, rh); - return 0; - } + // Delete the Queued_Data.. + TAO_Queued_Data::release (qd); - return 1; + return (retval == -1) ? -1 : 0; } int @@ -1959,6 +1945,8 @@ TAO_Transport::notify_reactor (void) } ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; // Get the reactor associated with the event handler ACE_Reactor *reactor = this->orb_core ()->reactor (); @@ -1995,6 +1983,18 @@ TAO_Transport::transport_cache_manager (void) return this->orb_core_->lane_resources ().transport_cache (); } +size_t +TAO_Transport::recv_buffer_size (void) +{ + return this->recv_buffer_size_; +} + +size_t +TAO_Transport::sent_byte_count (void) +{ + return this->sent_byte_count_; +} + void TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp) { |