diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2002-06-14 18:13:49 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2002-06-14 18:13:49 +0000 |
commit | 314fcbfe47cc316aa92178caa98efeaa512caa0d (patch) | |
tree | 429110387dde74ee0b5ce0d303596b3a8dd8fee0 /TAO/tao/Transport.cpp | |
parent | cf804eb57c0c927ce04ffc83de80cf589f71cc8f (diff) | |
download | ATCD-314fcbfe47cc316aa92178caa98efeaa512caa0d.tar.gz |
ChangeLogTag:Fri Jun 14 13:58:56 2002 Carlos O'Ryan <coryan@atdesk.com>
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 650 |
1 files changed, 355 insertions, 295 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index ea2de3ca725..13298f4575b 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -104,27 +104,21 @@ TAO_Transport::~TAO_Transport (void) delete this->handler_lock_; - TAO_Queued_Message *i = this->head_; - while (i != 0) + while (this->head_ != 0) { - // @@ This is a good point to insert a flag to indicate that a - // CloseConnection message was successfully received. - i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); - - TAO_Queued_Message *tmp = i; - i = i->next (); - - tmp->destroy (); + TAO_Queued_Message *i = this->head_; + this->head_ = i->next (); + i->destroy (); } - // Avoid making the call if we can. This may be redundant, unless - // someone called handle_close() on the connection handler from - // outside the TAO_Transport. - if (this->cache_map_entry_ != 0) - { - this->orb_core_->lane_resources ().transport_cache ().purge_entry ( + // Avoid making the call if we can. This may be redundant, unless + // someone called handle_close() on the connection handler from + // outside the TAO_Transport. + if (this->cache_map_entry_ != 0) + { + this->orb_core_->lane_resources ().transport_cache ().purge_entry ( this->cache_map_entry_); - } + } } int @@ -186,8 +180,8 @@ dump_iov (iovec *iov, int iovcnt, int id, ACE_Log_Msg::instance ()->acquire (); ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::%s" - " sending %d buffers\n", + "TAO (%P|%t) - Transport[%d]::%s, " + "sending %d buffers\n", id, location, iovcnt)); for (int i = 0; i != iovcnt && 0 < current_transfer; ++i) { @@ -198,8 +192,8 @@ dump_iov (iovec *iov, int iovcnt, int id, iov_len = current_transfer; ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::%s" - " buffer %d/%d has %d bytes\n", + "TAO (%P|%t) - Transport[%d]::%s, " + "buffer %d/%d has %d bytes\n", id, location, i, iovcnt, iov_len)); @@ -209,10 +203,10 @@ dump_iov (iovec *iov, int iovcnt, int id, { ACE_TCHAR header[1024]; ACE_OS::sprintf (header, - ACE_LIB_TEXT("TAO - Transport[%d]::%s (") - ACE_SIZE_T_FORMAT_SPECIFIER ACE_LIB_TEXT("/") - ACE_SIZE_T_FORMAT_SPECIFIER ACE_LIB_TEXT(")\n"), - id, ACE_TEXT_CHAR_TO_TCHAR(location), offset, iov_len); + "TAO - Transport[%d]::%s (" + ACE_SIZE_T_FORMAT_SPECIFIER "/" + ACE_SIZE_T_FORMAT_SPECIFIER ")\n", + id, location, offset, iov_len); len = iov_len - offset; if (len > 512) @@ -225,9 +219,9 @@ dump_iov (iovec *iov, int iovcnt, int id, current_transfer -= iov_len; } ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::%s") - ACE_LIB_TEXT(" end of data\n"), - id, ACE_TEXT_CHAR_TO_TCHAR(location))); + "TAO (%P|%t) - Transport[%d]::%s, " + "end of data\n", + id, location)); ACE_Log_Msg::instance ()->release (); } @@ -239,7 +233,7 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb, { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - if (this->check_event_handler_i ("TAO_Transport::send_message_block_chain") == -1) + if (this->check_event_handler_i ("Transport::send_message_block_chain") == -1) return -1; return this->send_message_block_chain_i (mb, @@ -289,144 +283,27 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, } int -TAO_Transport::send_message_i (TAO_Stub *stub, - int is_synchronous, - const ACE_Message_Block *message_block, - ACE_Time_Value *max_wait_time) +TAO_Transport::send_message_shared (TAO_Stub *stub, + int is_synchronous, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) { - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - - if (this->check_event_handler_i ("TAO_Transport::send_message_i") == -1) - return -1; - - if (is_synchronous) - { - return this->send_synchronous_message_i (message_block, - max_wait_time); - } - - // Let's figure out if the message should be queued without trying - // to send first: - int try_sending_first = 1; - - int queue_empty = (this->head_ == 0); - - if (!queue_empty) - try_sending_first = 0; - else if (stub->sync_strategy ().must_queue (queue_empty)) - try_sending_first = 0; - - ssize_t n; - - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - - if (try_sending_first) - { - size_t byte_count = 0; - // ... in this case we must try to send the message first ... - - size_t total_length = message_block->total_length (); - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ") - ACE_LIB_TEXT("trying to send the message (ml = %d)\n"), - this->id (), total_length)); - } - - // @@ I don't think we want to hold the mutex here, however if - // we release it we need to recheck the status of the transport - // after we return... once I understand the final form for this - // code I will re-visit this decision - n = this->send_message_block_chain_i (message_block, - byte_count, - max_wait_time); - if (n == -1) - { - // ... if this is just an EWOULDBLOCK we must schedule the - // message for later, if it is ETIME we still have to send - // the complete message, because cutting off the message at - // this point will destroy the synchronization with the - // server ... - if (errno != EWOULDBLOCK && errno != ETIME) - { - return -1; - } - } - - // ... let's figure out if the complete message was sent ... - if (total_length == byte_count) - { - // Done, just return. Notice that there are no allocations - // or copies up to this point (though some fancy calling - // back and forth). - // This is the common case for the critical path, it should - // be fast. - return 0; - } - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ") - ACE_LIB_TEXT("partial send %d / %d bytes\n"), - this->id (), byte_count, total_length)); - } - - // ... part of the data was sent, need to figure out what piece - // of the message block chain must be queued ... - while (message_block != 0 && message_block->length () == 0) - message_block = message_block->cont (); - - // ... at least some portion of the message block chain should - // remain ... - ACE_ASSERT (message_block != 0); - } - - // ... either the message must be queued or we need to queue it - // because it was not completely sent out ... - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ") - ACE_LIB_TEXT("message is queued\n"), - this->id ())); - } - - TAO_Queued_Message *queued_message = 0; - ACE_NEW_RETURN (queued_message, - TAO_Asynch_Queued_Message (message_block), - -1); - queued_message->push_back (this->head_, this->tail_); - - // ... if the queue is full we need to activate the output on the - // queue ... - int must_flush = 0; - int constraints_reached = - this->check_buffering_constraints_i (stub, - must_flush); - - // ... but we also want to activate it if the message was partially - // sent.... Plus, when we use the blocking flushing strategy the - // queue is flushed as a side-effect of 'schedule_output()' - - if (constraints_reached || try_sending_first) - { - (void) flushing_strategy->schedule_output (this); - } + int r; + { + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - if (must_flush) - { - typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; - TAO_REVERSE_LOCK reverse (*this->handler_lock_); - ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + if (this->check_event_handler_i ("Transport::send_message_shared") == -1) + return -1; - (void) flushing_strategy->flush_transport (this); - } + r = this->send_message_shared_i (stub, is_synchronous, + message_block, max_wait_time); + } + if (r == -1) + { + this->close_connection (); + } - return 0; + return r; } int @@ -517,8 +394,9 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT("TAO (%P|%t) TAO_Transport::send_synchronous_message_i, ") - ACE_LIB_TEXT("error while flushing message %p\n"), "")); + "TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, " + "error while flushing message %p\n", + this->id (), "")); } return -1; @@ -552,22 +430,20 @@ TAO_Transport::tear_listen_point_list (TAO_InputCDR &) ACE_NOTSUP_RETURN (-1); } - void TAO_Transport::connection_handler_closing (void) { - { - ACE_MT (ACE_GUARD (ACE_Lock, - guard, - *this->handler_lock_)); + // The connection has closed, we must invalidate the handler to + // ensure that any attempt to use this transport results in an + // error. Basically all the other methods in the Transport + // cooperate via check_event_handler_i() - this->transition_handler_state_i (); - } - // Can't hold the lock while we release, b/c the release could - // invoke the destructor! + (void) this->invalidate_event_handler (); + this->send_connection_closed_notifications (); - // This should be the last thing we do here - TAO_Transport::release(this); + // Can't hold the lock while we release, b/c the release could + // invoke the destructor! This should be the last thing we do here + TAO_Transport::release (this); } TAO_Transport* @@ -594,14 +470,21 @@ TAO_Transport::release (TAO_Transport* transport) else if (count < 0) { ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("(%P|%t) TAO_Transport::release, ") - ACE_LIB_TEXT ("reference countis less than zero: %d\n"), - count)); + "TAO (%P|%t) - Transport[%d]::release, " + "reference count is less than zero: %d\n", + transport->id (), count)); ACE_OS::abort (); } } } +// @@ TODO Somebody needs to go throught this code and change those +// stupidly long calls sequences: +// +// this->orb_core_->lane_resources ().transport_cache (). +// +// to a single inline function!! + int TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc) { @@ -637,36 +520,40 @@ TAO_Transport::make_idle (void) void TAO_Transport::close_connection (void) { - this->close_connection_i (); - - // Purge the entry - if (this->cache_map_entry_ != 0) - { - this->orb_core_->lane_resources ().transport_cache ().purge_entry ( - this->cache_map_entry_); - } + ACE_Event_Handler * eh = this->invalidate_event_handler (); + this->close_connection_shared (1, eh); } - void TAO_Transport::close_connection_i (void) { - ACE_Event_Handler *eh = 0; - { - ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); + ACE_Event_Handler * eh = this->invalidate_event_handler_i (); + this->close_connection_shared (1, eh); +} - eh = this->event_handler_i (); +void +TAO_Transport::close_connection_no_purge (void) +{ + ACE_Event_Handler * eh = this->invalidate_event_handler (); + this->close_connection_shared (0, eh); +} - this->transition_handler_state_i (); +void +TAO_Transport::close_connection_shared (int disable_purge, + ACE_Event_Handler * eh) +{ + // Purge the entry + if (!disable_purge && this->cache_map_entry_ != 0) + { + this->orb_core_->lane_resources ().transport_cache ().purge_entry ( + this->cache_map_entry_); + } - if (eh == 0) + if (eh == 0) + { + // The connection was already closed return; - } - - // Close the underlying connection, it is enough to get an - // Event_Handler pointer to do this, so we can factor out the code - // in the base TAO_Transport class. - + } // We first try to remove the handler from the reactor. After that // we destroy the handler using handle_close (). The remove handler @@ -683,10 +570,7 @@ TAO_Transport::close_connection_i (void) (void) eh->handle_close (ACE_INVALID_HANDLE, ACE_Event_Handler::ALL_EVENTS_MASK); - for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) - { - i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); - } + this->send_connection_closed_notifications (); } ssize_t @@ -699,7 +583,7 @@ TAO_Transport::send (iovec *iov, int iovcnt, *this->handler_lock_, -1)); - if (this->check_event_handler_i ("TAO_Transport::send") == -1) + if (this->check_event_handler_i ("Transport::send") == -1) return -1; // now call the template method @@ -716,7 +600,7 @@ TAO_Transport::recv (char *buffer, *this->handler_lock_, -1)); - if (this->check_event_handler_i ("TAO_Transport::recv") == -1) + if (this->check_event_handler_i ("Transport::recv") == -1) return -1; // now call the template method @@ -736,8 +620,9 @@ TAO_Transport::generate_locate_request ( { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("(%P|%t) Error in marshalling the \n") - ACE_LIB_TEXT ("LocateRequest Header \n"))); + "TAO (%P|%t) - Transport[%d]::generate_locate_request, " + "error while marshalling the LocateRequest header\n", + this->id ())); return -1; } @@ -758,8 +643,9 @@ TAO_Transport::generate_request_header ( { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("(%P|%t) Error in marshalling the \n") - ACE_LIB_TEXT ("LocateRequest Header \n"))); + "(%P|%t) - Transport[%d]::generate_request_header, " + "error while marshalling the Request header\n", + this->id())); return -1; } @@ -775,7 +661,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::handle_input\n"), + "TAO (%P|%t) - Transport[%d]::handle_input\n", this->id ())); } @@ -788,10 +674,11 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, { if (TAO_debug_level > 2) ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) Transport::handle_input_i,") - ACE_LIB_TEXT("error while parsing the head of the queue \n"))); + "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "error while parsing the head of the queue\n", + this->id())); - this->tms_->connection_closed (); + this->send_connection_closed_notifications (); } return retval; } @@ -838,7 +725,9 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, if (n <= 0) { if (n == -1) - this->tms_->connection_closed (); + { + this->send_connection_closed_notifications (); + } return n; } @@ -846,8 +735,9 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) Read [%d] bytes from transport [%d]\n"), - n, this->id ())); + "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "read %d bytes\n", + this->id (), n)); } // Set the write pointer in the stack buffer @@ -864,8 +754,9 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, if (retval == -1 && TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport::handle_input_i ") - ACE_LIB_TEXT("error while parsing and consolidating \n"))); + "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "error while parsing and consolidating\n", + this->id ())); } return retval; } @@ -943,9 +834,11 @@ TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) { if (TAO_debug_level > 2) ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("TAO (%P|%t) - error in incoming message \n"))); + "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, " + "error in incoming message\n", + this->id ())); - this->tms_->connection_closed (); + this->send_connection_closed_notifications (); return -1; } } @@ -984,7 +877,7 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n"), + "TAO (%P|%t) - Transport[%d]::consolidate_message\n", this->id ())); } @@ -1013,12 +906,12 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Read [%d] bytes on attempt \n", - n)); + "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "read %d bytes on attempt\n", + this->id(), n)); } - if (n == 0 || - n == -1) + if (n == 0 || n == -1) { break; } @@ -1033,10 +926,11 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Trasport::consolidate_message,") - ACE_LIB_TEXT("error while trying to consolidate \n"))); + "TAO (%P|%t) - Trasport[%d]::consolidate_message, " + "error while trying to consolidate\n", + this->id ())); } - this->tms_->connection_closed (); + this->send_connection_closed_notifications (); return -1; } @@ -1056,8 +950,8 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n") - ACE_LIB_TEXT("queueing up the message \n"), + "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "queueing up the message\n", this->id ())); } @@ -1154,7 +1048,7 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_message_queue \n"), + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n", this->id ())); } @@ -1187,8 +1081,9 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, if (TAO_debug_level) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Error while consolidating... \n"), - ACE_LIB_TEXT("TAO (%P|%t) - .. part of the read message \n"))); + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "error while consolidating, part of the read message\n", + this->id ())); } return retval; } @@ -1277,7 +1172,7 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n"), + "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n", this->id ())); } @@ -1302,8 +1197,8 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n") - ACE_LIB_TEXT(".............. extracting extra messages \n"), + "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, " + "extracting extra messages\n", this->id ())); } @@ -1351,11 +1246,11 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("TAO (%P|%t) - %p\n"), - ACE_LIB_TEXT ("Close Connection Message recd \n"))); + "TAO (%P|%t) - Transport[%d]::process_parsed_messages, " + "received CloseConnection message %p\n", + this->id(), "")); - // Close the TMS - this->tms_->connection_closed (); + this->send_connection_closed_notifications (); // Return a "-1" so that the next stage can take care of // closing connection and the necessary memory management. @@ -1372,8 +1267,7 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, this, qd) == -1) { - // Close the TMS - this->tms_->connection_closed (); + this->send_connection_closed_notifications (); // Return a "-1" so that the next stage can take care of // closing connection and the necessary memory management. @@ -1395,12 +1289,11 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("TAO (%P|%t) - %p\n"), - ACE_LIB_TEXT ("IIOP_Transport::process_message, ") - ACE_LIB_TEXT ("process_reply_message ()"))); + "TAO (%P|%t) - Transport[%d]::process_parsed_messages, " + "error in process_reply_message %p\n", + this->id (), "")); - this->messaging_object ()->reset (); - this->tms_->connection_closed (); + this->send_connection_closed_notifications (); return -1; } @@ -1412,23 +1305,25 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, // every reply on this connection. if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("TAO (%P|%t) : IIOP_Transport::") - ACE_LIB_TEXT ("process_message - ") - ACE_LIB_TEXT ("dispatch reply failed\n"))); + "TAO (%P|%t) - Transport[%d]::process_parsed_messages, " + "dispatch reply failed\n", + this->id ())); - this->messaging_object ()->reset (); - this->tms_->connection_closed (); + this->send_connection_closed_notifications (); return -1; } } else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) { - // Ys, we print out all levels that we are closing the - // connection. - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("(%P|%t) Closing down the connection \n")), - -1); + if (TAO_debug_level) + { + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Transport[%d]::process_parsed_messages, " + "received MessageError, closing connection\n", + this->id ()), + -1); + } } // If not, just return back.. @@ -1489,7 +1384,7 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::process_queue_head \n"), + "TAO (%P|%t) - Transport[%d]::process_queue_head\n", this->id ())); } @@ -1503,8 +1398,8 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO(%P|%t) - Transport[%d]::process_queue_head") - ACE_LIB_TEXT(" the size of the queue is [%d] \n"), + "TAO(%P|%t) - Transport[%d]::process_queue_head, " + "the size of the queue is [%d]\n", this->id (), this->incoming_message_queue_.queue_length())); } @@ -1527,7 +1422,8 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::notify to Reactor\n"), + "TAO (%P|%t) - Transport[%d]::process_queue_header, " + "notify to Reactor\n", this->id ())); } @@ -1544,8 +1440,9 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) // @@todo: need to think about what is the action that // we can take when we get here. ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("TAO (%P|%t) - Transport::process_queue_head ") - ACE_LIB_TEXT ("notify to the reactor failed.. \n"))); + "TAO (%P|%t) - Transport[%d]::process_queue_head, " + "notify to the reactor failed..\n", + this->id ())); } } } @@ -1623,7 +1520,7 @@ TAO_Transport::schedule_output_i (void) if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::schedule_output\n"), + "TAO (%P|%t) - Transport[%d]::schedule_output\n", this->id ())); } @@ -1644,7 +1541,7 @@ TAO_Transport::cancel_output_i (void) if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::cancel_output\n"), + "TAO (%P|%t) - Transport[%d]::cancel_output\n", this->id ())); } @@ -1658,8 +1555,9 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::handle_timeout, ") - ACE_LIB_TEXT("timer expired\n"))); + "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, " + "timer expired\n", + this->id ())); } /// This is the only legal ACT in the current configuration.... @@ -1704,7 +1602,7 @@ TAO_Transport::drain_queue (void) int TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) { - if (this->check_event_handler_i ("TAO_Transport::drain_queue_helper") == -1) + if (this->check_event_handler_i ("Transport::drain_queue_helper") == -1) return -1; size_t byte_count = 0; @@ -1730,8 +1628,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_helper, ") - ACE_LIB_TEXT("send() returns 0"))); + "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " + "send() returns 0", + this->id ())); } return -1; } @@ -1740,9 +1639,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_helper, ") - ACE_LIB_TEXT("%p"), - ACE_LIB_TEXT("send()"))); + "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " + "error during %p\n", + this->id (), "send_i()")); } if (errno == EWOULDBLOCK) return 0; @@ -1756,9 +1655,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_helper, ") - ACE_LIB_TEXT("byte_count = %d, head_is_empty = %d\n"), - byte_count, (this->head_ == 0))); + "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " + "byte_count = %d, head_is_empty = %d\n", + this->id(), byte_count, (this->head_ == 0))); } return 1; } @@ -1790,9 +1689,9 @@ TAO_Transport::drain_queue_i (void) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_i, ") - ACE_LIB_TEXT("helper retval = %d\n"), - retval)); + "TAO (%P|%t) - Transport[%d]::drain_queue_i, " + "helper retval = %d\n", + this->id (), retval)); } if (retval != 1) return retval; @@ -1814,9 +1713,9 @@ TAO_Transport::drain_queue_i (void) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_i, ") - ACE_LIB_TEXT("helper retval = %d\n"), - retval)); + "TAO (%P|%t) - Transport[%d]::drain_queue_i, " + "helper retval = %d\n", + this->id (), retval)); } if (retval != 1) return retval; @@ -1853,9 +1752,9 @@ TAO_Transport::cleanup_queue (size_t byte_count) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::cleanup_queue, ") - ACE_LIB_TEXT("byte_count = %d\n"), - byte_count)); + "TAO (%P|%t) - Transport[%d]::cleanup_queue, " + "byte_count = %d\n", + this->id (), byte_count)); } // Update the state of the first message @@ -1864,9 +1763,9 @@ TAO_Transport::cleanup_queue (size_t byte_count) if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::cleanup_queue, ") - ACE_LIB_TEXT("after transfer, bc = %d, all_sent = %d, ml = %d\n"), - byte_count, i->all_data_sent (), + "TAO (%P|%t) - Transport[%d]::cleanup_queue, " + "after transfer, bc = %d, all_sent = %d, ml = %d\n", + this->id (), byte_count, i->all_data_sent (), i->message_length ())); } @@ -1939,13 +1838,174 @@ TAO_Transport::report_invalid_event_handler (const char *caller) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("(%P|%t) transport %d (tag=%d) %s ") - ACE_LIB_TEXT("no longer associated with handler, ") - ACE_LIB_TEXT("returning -1 with errno = ENOENT\n"), - this->id (), - this->tag_, - ACE_TEXT_CHAR_TO_TCHAR(caller))); + "(%P|%t) - Transport[%d]::report_invalid_event_handler" + "(%s) no longer associated with handler [tag=%d]\n", + this->id (), caller, this->tag_)); + } +} + +ACE_Event_Handler * +TAO_Transport::invalidate_event_handler (void) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, 0)); + + return this->invalidate_event_handler_i (); +} + +void +TAO_Transport::send_connection_closed_notifications (void) +{ + while (this->head_ != 0) + { + TAO_Queued_Message *i = this->head_; + + // @@ This is a good point to insert a flag to indicate that a + // CloseConnection message was successfully received. + i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); + + this->head_ = i->next (); + + i->destroy (); + } + + this->tms ()->connection_closed (); + this->messaging_object ()->reset (); +} + +int +TAO_Transport::send_message_shared_i (TAO_Stub *stub, + int is_synchronous, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + if (is_synchronous) + { + return this->send_synchronous_message_i (message_block, + max_wait_time); + } + + // Let's figure out if the message should be queued without trying + // to send first: + int try_sending_first = 1; + + int queue_empty = (this->head_ == 0); + + if (!queue_empty) + try_sending_first = 0; + else if (stub->sync_strategy ().must_queue (queue_empty)) + try_sending_first = 0; + + ssize_t n; + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + if (try_sending_first) + { + size_t byte_count = 0; + // ... in this case we must try to send the message first ... + + size_t total_length = message_block->total_length (); + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + "trying to send the message (ml = %d)\n", + this->id (), total_length)); + } + + // @@ I don't think we want to hold the mutex here, however if + // we release it we need to recheck the status of the transport + // after we return... once I understand the final form for this + // code I will re-visit this decision + n = this->send_message_block_chain_i (message_block, + byte_count, + max_wait_time); + if (n == -1) + { + // ... if this is just an EWOULDBLOCK we must schedule the + // message for later, if it is ETIME we still have to send + // the complete message, because cutting off the message at + // this point will destroy the synchronization with the + // server ... + if (errno != EWOULDBLOCK && errno != ETIME) + { + return -1; + } + } + + // ... let's figure out if the complete message was sent ... + if (total_length == byte_count) + { + // Done, just return. Notice that there are no allocations + // or copies up to this point (though some fancy calling + // back and forth). + // This is the common case for the critical path, it should + // be fast. + return 0; + } + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + "partial send %d / %d bytes\n", + this->id (), byte_count, total_length)); + } + + // ... part of the data was sent, need to figure out what piece + // of the message block chain must be queued ... + while (message_block != 0 && message_block->length () == 0) + message_block = message_block->cont (); + + // ... at least some portion of the message block chain should + // remain ... + ACE_ASSERT (message_block != 0); } + + // ... either the message must be queued or we need to queue it + // because it was not completely sent out ... + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + "message is queued\n", + this->id ())); + } + + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message (message_block), + -1); + queued_message->push_back (this->head_, this->tail_); + + // ... if the queue is full we need to activate the output on the + // queue ... + int must_flush = 0; + int constraints_reached = + this->check_buffering_constraints_i (stub, + must_flush); + + // ... but we also want to activate it if the message was partially + // sent.... Plus, when we use the blocking flushing strategy the + // queue is flushed as a side-effect of 'schedule_output()' + + if (constraints_reached || try_sending_first) + { + (void) flushing_strategy->schedule_output (this); + } + + if (must_flush) + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + + (void) flushing_strategy->flush_transport (this); + } + + return 0; } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) |