diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 318 |
1 files changed, 179 insertions, 139 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 814bd170f97..68893083f9a 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -155,12 +155,16 @@ TAO_Transport::provide_handle (ACE_Handle_Set &handle_set) } static void -dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer) +dump_iov (iovec *iov, int iovcnt, int id, + size_t current_transfer, + const char *location) { + ACE_Log_Msg::instance ()->acquire (); + ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_block_chain" + "TAO (%P|%t) - Transport[%d]::%s" " sending %d buffers\n", - id, iovcnt)); + id, location, iovcnt)); for (int i = 0; i != iovcnt && 0 < current_transfer; ++i) { size_t iov_len = iov[i].iov_len; @@ -170,28 +174,36 @@ dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer) iov_len = current_transfer; ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_block_chain" + "TAO (%P|%t) - Transport[%d]::%s" " buffer %d/%d has %d bytes\n", - id, + id, location, i, iovcnt, iov_len)); + size_t len; for (size_t offset = 0; offset < iov_len; offset += len) { + char header[1024]; + ACE_OS::sprintf (header, + "TAO - Transport[%d]::%s (%d/%d)\n", + id, location, offset, iov_len); + len = iov_len - offset; if (len > 512) len = 512; ACE_HEX_DUMP ((LM_DEBUG, ACE_static_cast(char*,iov[i].iov_base) + offset, len, - "TAO (%P|%t) - Transport::send_message_block_chain ")); + header)); } current_transfer -= iov_len; } ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_mesage_block_chain" + "TAO (%P|%t) - Transport[%d]::%s" " end of data\n", - id)); + id, location)); + + ACE_Log_Msg::instance ()->release (); } int @@ -230,9 +242,10 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block, ssize_t result = this->send (iov, iovcnt, current_transfer, timeout); - if (TAO_debug_level > 6) + if (TAO_debug_level == 2) { - dump_iov (iov, iovcnt, this->id (), current_transfer); + dump_iov (iov, iovcnt, this->id (), + current_transfer, "send_message_block_chain"); } // Add to total bytes transferred. @@ -260,9 +273,10 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block, ssize_t result = this->send (iov, iovcnt, current_transfer, timeout); - if (TAO_debug_level > 6) + if (TAO_debug_level == 2) { - dump_iov (iov, iovcnt, this->id (), current_transfer); + dump_iov (iov, iovcnt, this->id (), + current_transfer, "send_message_block_chain"); } // Add to total bytes transferred. @@ -279,162 +293,155 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block, int TAO_Transport::send_message_i (TAO_Stub *stub, - int twoway_flag, + int is_synchronous, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); - int queue_empty = (this->head_ == 0); + if (is_synchronous) + { + return this->send_synchronous_message_i (stub, + message_block, + max_wait_time); + } // Let's figure out if the message should be queued without trying // to send first: - int must_queue = 0; - if (this->head_ != 0) - must_queue = 1; - else if (!twoway_flag - && stub->sync_strategy ().must_queue (queue_empty)) - { - must_queue = 1; - } + int try_sending_first = 1; - if (must_queue) + int queue_empty = (this->head_ == 0); + + if (!queue_empty) + try_sending_first = 0; + else if (stub->sync_strategy ().must_queue (queue_empty)) + try_sending_first = 0; + + size_t byte_count = 0; + ssize_t n; + if (try_sending_first) { - // ... simply queue the message ... + // ... in this case we must try to send the message first ... if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_message_i, " - "message is queued\n", + "trying to send the message\n", this->id ())); } - TAO_Queued_Message *queued_message = 0; - ACE_NEW_RETURN (queued_message, - TAO_Asynch_Queued_Message (message_block), - -1); - queued_message->push_back (this->head_, this->tail_); + // @@ I don't think we want to hold the mutex here, however if + // we release it we need to recheck the status of the transport + // after we return... once I understand the final form for this + // code I will re-visit this decision + n = this->send_message_block_chain (message_block, + byte_count, + max_wait_time); + if (n == 0) + return -1; // EOF + else if (n == -1) + { + // ... if this is just an EWOULDBLOCK we must schedule the + // message for later, if it is ETIME we still have to send + // the complete message, because cutting off the message at + // this point will destroy the synchronization with the + // server ... + if (errno != EWOULDBLOCK && errno != ETIME) + { + return -1; + } + } - if (this->must_flush_queue_i (stub)) + // ... let's figure out if the complete message was sent ... + if (message_block->total_length () == byte_count) { - ace_mon.release (); - int result = flushing_strategy->flush_message (this, - this->tail_); - return result; + // Done, just return. Notice that there are no allocations + // or copies up to this point (though some fancy calling + // back and forth). + // This is the common case for the critical path, it should + // be fast. + return 0; } - return 0; } - // ... in this case we must try to send the message first ... + // ... either the message must be queued or we need to queue it + // because it was not completely sent out ... if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_message_i, " - "trying to send the message\n", + "message is queued\n", this->id ())); } - size_t byte_count; + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message (message_block), + -1); + queued_message->bytes_transferred (byte_count); + queued_message->push_back (this->head_, this->tail_); - // @@ I don't think we want to hold the mutex here, however if - // we release it we need to recheck the status of the transport - // after we return... once I understand the final form for this - // code I will re-visit this decision - ssize_t n = this->send_message_block_chain (message_block, - byte_count, - max_wait_time); - if (n == 0) - return -1; // EOF - else if (n == -1) + // ... if the queue is full we need to activate the output on the + // queue ... + if (this->must_flush_queue_i (stub)) { - // ... if this is just an EWOULDBLOCK we must schedule the - // message for later ... - if (errno != EWOULDBLOCK) - { - return -1; - } + this->orb_core ()->flushing_strategy ()->schedule_output (this); } - // ... let's figure out if the complete message was sent ... - if (message_block->total_length () == byte_count) - { - // Done, just return. Notice that there are no allocations - // or copies up to this point (though some fancy calling - // back and forth). - // This is the common case for the critical path, it should - // be fast. - return 0; - } + // ... in any case, check for timeouts and report them to the + // application ... + if (max_wait_time != 0 && n == -1 && errno == ETIME) + return -1; - // ... the message was only partially sent, schedule reactive - // output... - flushing_strategy->schedule_output (this); + return 0; +} - // ... and set it as the current message ... - if (twoway_flag) - { - // ... we are going to block, so there is no need to clone - // the message block... - // @@ It seems wasteful to allocate a TAO_Queued_Message in - // this case, but it is simpler to do it this way. - TAO_Synch_Queued_Message synch_message (message_block); - - synch_message.bytes_transferred (byte_count); - synch_message.push_back (this->head_, this->tail_); - - // Release the mutex, other threads may modify the queue as we - // block for a long time writing out data. - int result; - { - ace_mon.release (); - result = flushing_strategy->flush_message (this, - &synch_message); - - ace_mon.acquire (); - } - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - synch_message.destroy (); - return result; - } +int +TAO_Transport::send_synchronous_message_i (TAO_Stub *stub, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + // We are going to block, so there is no need to clone + // the message block. + TAO_Synch_Queued_Message synch_message (message_block); - TAO_Queued_Message *queued_message = 0; - ACE_NEW_RETURN (queued_message, - TAO_Asynch_Queued_Message (message_block), - -1); + synch_message.push_back (this->head_, this->tail_); - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - " queued anyway, %d bytes sent\n", - this->id (), - byte_count)); - } + int n = this->drain_queue_i (); + if (n == -1) + return -1; // Error while sending... + else if (n == 1) + return 1; // Empty queue, message was sent.. - // ... insert at the head of the queue, we can use push_back() - // because the queue is empty ... + ACE_ASSERT (n == 0); // Some data sent, but data remains. - queued_message->bytes_transferred (byte_count); - queued_message->push_back (this->head_, this->tail_); + if (synch_message.all_data_sent ()) + return 1; - // ... this is not a twoway. We must check if the buffering - // constraints have been reached, if so, then we should start - // flushing out data.... + // @todo: Check for timeouts! + // if (max_wait_time != 0 && errno == ETIME) return -1; - if (this->must_flush_queue_i (stub)) - { - ace_mon.release (); - int result = flushing_strategy->flush_message (this, - this->tail_); - return result; - } - return 0; + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + (void) flushing_strategy->schedule_output (this); + + // Release the mutex, other threads may modify the queue as we + // block for a long time writing out data. + int result; + { + typedef ACE_Reverse_Lock<TAO_SYNCH_MUTEX> TAO_REVERSE_SYNCH_MUTEX; + TAO_REVERSE_SYNCH_MUTEX reverse (this->queue_mutex_); + + ACE_GUARD_RETURN (TAO_REVERSE_SYNCH_MUTEX, ace_mon, reverse, -1); + result = flushing_strategy->flush_message (this, + &synch_message); + + } + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return result; } int @@ -547,18 +554,29 @@ TAO_Transport::make_idle (void) void TAO_Transport::close_connection (void) { - ACE_MT (ACE_GUARD (ACE_Lock, - guard, - *this->handler_lock_)); + ACE_Event_Handler *eh = 0; + { + ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); - // Call handle close on the handler. - // The event handler is as common as we can get - ACE_Event_Handler *eh = this->event_handler_i (); - if (eh) - eh->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::ALL_EVENTS_MASK); + eh = this->event_handler_i (); + + this->transition_handler_state_i (); + + if (eh == 0) + return; + } + + // Close the underlying connection, it is enough to get an + // Event_Handler pointer to do this, so we can factor out the code + // in the base TAO_Transport class. + (void) eh->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::ALL_EVENTS_MASK); // Purge the entry + // @todo This is redundant, handle_close() eventually calls + // this->connection_handler_closing(), that performs the same + // work, for some reason they hold the mutex while they do + // that work though. this->orb_core_->transport_cache ().purge_entry (this->cache_map_entry_); for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) @@ -580,11 +598,14 @@ TAO_Transport::send (iovec *iov, int iovcnt, // if there's no associated event handler, then we act like a null transport if (this->event_handler_i () == 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) transport %d (tag=%d) send() ") - ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"), - this->id (), - this->tag_)); + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) transport %d (tag=%d) send() ") + ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"), + this->id (), + this->tag_)); + } errno = ENOENT; return -1; } @@ -716,6 +737,12 @@ TAO_Transport::drain_queue (void) { ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); + return this->drain_queue_i (); +} + +int +TAO_Transport::drain_queue_i (void) +{ if (this->head_ == 0) return 1; @@ -743,6 +770,12 @@ TAO_Transport::drain_queue (void) ssize_t retval = this->send (iov, iovcnt, byte_count); + if (TAO_debug_level == 2) + { + dump_iov (iov, iovcnt, this->id (), + byte_count, "drain_queue_i"); + } + // ... now we need to update the queue, removing elements // that have been sent, and updating the last element if it // was only partially sent ... @@ -775,6 +808,12 @@ TAO_Transport::drain_queue (void) ssize_t retval = this->send (iov, iovcnt, byte_count); + if (TAO_debug_level == 2) + { + dump_iov (iov, iovcnt, this->id (), + byte_count, "drain_queue_i"); + } + this->cleanup_queue (byte_count); iovcnt = 0; @@ -788,6 +827,7 @@ TAO_Transport::drain_queue (void) return 0; return -1; } + ACE_ASSERT (byte_count != 0); if (this->head_ == 0) return 1; |